You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by el...@apache.org on 2018/10/08 08:47:43 UTC
[20/50] [abbrv] hadoop git commit: HDFS-13947. Review of
DirectoryScanner Class. Contributed by BELUGA BEHR.
HDFS-13947. Review of DirectoryScanner Class. Contributed by BELUGA BEHR.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/1dc0adfa
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/1dc0adfa
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/1dc0adfa
Branch: refs/heads/HEAD
Commit: 1dc0adfac0ee4821c67366728c70be9b59477b0f
Parents: 7051bd7
Author: Inigo Goiri <in...@apache.org>
Authored: Wed Oct 3 11:19:57 2018 -0700
Committer: Inigo Goiri <in...@apache.org>
Committed: Wed Oct 3 11:19:57 2018 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hdfs/DFSConfigKeys.java | 2 +-
.../hadoop/hdfs/server/datanode/DataNode.java | 2 +-
.../hdfs/server/datanode/DirectoryScanner.java | 601 ++++++++++---------
.../server/datanode/TestDirectoryScanner.java | 243 ++++----
.../fsdataset/impl/TestProvidedImpl.java | 13 +-
.../namenode/TestListCorruptFileBlocks.java | 50 +-
6 files changed, 481 insertions(+), 430 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1dc0adfa/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index d8024dc..42709de 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -709,7 +709,7 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY =
"dfs.datanode.directoryscan.throttle.limit.ms.per.sec";
public static final int
- DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_DEFAULT = 1000;
+ DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_DEFAULT = -1;
public static final String DFS_DATANODE_DNS_INTERFACE_KEY = "dfs.datanode.dns.interface";
public static final String DFS_DATANODE_DNS_INTERFACE_DEFAULT = "default";
public static final String DFS_DATANODE_DNS_NAMESERVER_KEY = "dfs.datanode.dns.nameserver";
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1dc0adfa/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index 270e30b..40f80a9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -1086,7 +1086,7 @@ public class DataNode extends ReconfigurableBase
reason = "verifcation is not supported by SimulatedFSDataset";
}
if (reason == null) {
- directoryScanner = new DirectoryScanner(this, data, conf);
+ directoryScanner = new DirectoryScanner(data, conf);
directoryScanner.start();
} else {
LOG.info("Periodic Directory Tree Verification scan " +
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1dc0adfa/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
index 99584d9..484899d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
@@ -17,17 +17,19 @@
*/
package org.apache.hadoop.hdfs.server.datanode;
-import com.google.common.annotations.VisibleForTesting;
import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
+import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -36,23 +38,27 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
-import org.apache.commons.lang3.time.FastDateFormat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi.ScanInfo;
+import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.StopWatch;
-import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ListMultimap;
/**
* Periodically scans the data directories for block and block metadata files.
@@ -62,48 +68,48 @@ import org.apache.hadoop.util.Time;
public class DirectoryScanner implements Runnable {
private static final Logger LOG =
LoggerFactory.getLogger(DirectoryScanner.class);
- private static final int MILLIS_PER_SECOND = 1000;
- private static final String START_MESSAGE =
- "Periodic Directory Tree Verification scan"
- + " starting at %s with interval of %dms";
- private static final String START_MESSAGE_WITH_THROTTLE = START_MESSAGE
- + " and throttle limit of %dms/s";
+
+ private static final int DEFAULT_MAP_SIZE = 32768;
private final FsDatasetSpi<?> dataset;
private final ExecutorService reportCompileThreadPool;
private final ScheduledExecutorService masterThread;
private final long scanPeriodMsecs;
- private final int throttleLimitMsPerSec;
- private volatile boolean shouldRun = false;
+ private final long throttleLimitMsPerSec;
+ private final AtomicBoolean shouldRun = new AtomicBoolean();
+
private boolean retainDiffs = false;
- private final DataNode datanode;
/**
* Total combined wall clock time (in milliseconds) spent by the report
- * compiler threads executing. Used for testing purposes.
+ * compiler threads executing. Used for testing purposes.
*/
@VisibleForTesting
final AtomicLong timeRunningMs = new AtomicLong(0L);
+
/**
* Total combined wall clock time (in milliseconds) spent by the report
- * compiler threads blocked by the throttle. Used for testing purposes.
+ * compiler threads blocked by the throttle. Used for testing purposes.
*/
@VisibleForTesting
final AtomicLong timeWaitingMs = new AtomicLong(0L);
+
/**
* The complete list of block differences indexed by block pool ID.
*/
@VisibleForTesting
- final ScanInfoPerBlockPool diffs = new ScanInfoPerBlockPool();
+ final BlockPoolReport diffs = new BlockPoolReport();
+
/**
- * Statistics about the block differences in each blockpool, indexed by
- * block pool ID.
+ * Statistics about the block differences in each blockpool, indexed by block
+ * pool ID.
*/
@VisibleForTesting
- final Map<String, Stats> stats = new HashMap<String, Stats>();
-
+ final Map<String, Stats> stats;
+
/**
- * Allow retaining diffs for unit test and analysis. Defaults to false (off)
+ * Allow retaining diffs for unit test and analysis. Defaults to false (off).
+ *
* @param b whether to retain diffs
*/
@VisibleForTesting
@@ -123,92 +129,157 @@ public class DirectoryScanner implements Runnable {
long missingMemoryBlocks = 0;
long mismatchBlocks = 0;
long duplicateBlocks = 0;
-
+
/**
* Create a new Stats object for the given blockpool ID.
+ *
* @param bpid blockpool ID
*/
public Stats(String bpid) {
this.bpid = bpid;
}
-
+
@Override
public String toString() {
- return "BlockPool " + bpid
- + " Total blocks: " + totalBlocks + ", missing metadata files:"
- + missingMetaFile + ", missing block files:" + missingBlockFile
- + ", missing blocks in memory:" + missingMemoryBlocks
- + ", mismatched blocks:" + mismatchBlocks;
+ return "BlockPool " + bpid + " Total blocks: " + totalBlocks
+ + ", missing metadata files: " + missingMetaFile
+ + ", missing block files: " + missingBlockFile
+ + ", missing blocks in memory: " + missingMemoryBlocks
+ + ", mismatched blocks: " + mismatchBlocks;
}
}
/**
* Helper class for compiling block info reports from report compiler threads.
+ * Contains a volume, a set of block pool IDs, and a collection of ScanInfo
+ * objects. If a block pool exists but has no ScanInfo objects associated with
+ * it, there will be no mapping for that particular block pool.
*/
- static class ScanInfoPerBlockPool extends
- HashMap<String, LinkedList<ScanInfo>> {
-
+ @VisibleForTesting
+ public static class ScanInfoVolumeReport {
+
+ @SuppressWarnings("unused")
private static final long serialVersionUID = 1L;
+ private final FsVolumeSpi volume;
+
+ private final BlockPoolReport blockPoolReport;
+
/**
* Create a new info list.
+ *
+ * @param volume
*/
- ScanInfoPerBlockPool() {super();}
+ ScanInfoVolumeReport(final FsVolumeSpi volume) {
+ this.volume = volume;
+ this.blockPoolReport = new BlockPoolReport();
+ }
/**
* Create a new info list initialized to the given expected size.
- * See {@link java.util.HashMap#HashMap(int)}.
*
* @param sz initial expected size
*/
- ScanInfoPerBlockPool(int sz) {super(sz);}
-
+ ScanInfoVolumeReport(final FsVolumeSpi volume,
+ final Collection<String> blockPools) {
+ this.volume = volume;
+ this.blockPoolReport = new BlockPoolReport(blockPools);
+ }
+
+ public void addAll(final String bpid,
+ final Collection<ScanInfo> scanInfos) {
+ this.blockPoolReport.addAll(bpid, scanInfos);
+ }
+
+ public Set<String> getBlockPoolIds() {
+ return this.blockPoolReport.getBlockPoolIds();
+ }
+
+ public List<ScanInfo> getScanInfo(final String bpid) {
+ return this.blockPoolReport.getScanInfo(bpid);
+ }
+
+ public FsVolumeSpi getVolume() {
+ return volume;
+ }
+
+ @Override
+ public String toString() {
+ return "ScanInfoVolumeReport [volume=" + volume + ", blockPoolReport="
+ + blockPoolReport + "]";
+ }
+ }
+
+ /**
+ * Helper class for compiling block info reports per block pool.
+ */
+ @VisibleForTesting
+ public static class BlockPoolReport {
+
+ @SuppressWarnings("unused")
+ private static final long serialVersionUID = 1L;
+
+ private final Set<String> blockPools;
+
+ private final ListMultimap<String, ScanInfo> map;
+
/**
- * Merges {@code that} ScanInfoPerBlockPool into this one
+ * Create a block pool report.
*
- * @param that ScanInfoPerBlockPool to merge
+ * @param volume
*/
- public void addAll(ScanInfoPerBlockPool that) {
- if (that == null) return;
-
- for (Entry<String, LinkedList<ScanInfo>> entry : that.entrySet()) {
- String bpid = entry.getKey();
- LinkedList<ScanInfo> list = entry.getValue();
-
- if (this.containsKey(bpid)) {
- //merge that per-bpid linked list with this one
- this.get(bpid).addAll(list);
- } else {
- //add that new bpid and its linked list to this
- this.put(bpid, list);
- }
- }
+ BlockPoolReport() {
+ this.blockPools = new HashSet<>(2);
+ this.map = ArrayListMultimap.create(2, DEFAULT_MAP_SIZE);
}
-
+
/**
- * Convert all the LinkedList values in this ScanInfoPerBlockPool map
- * into sorted arrays, and return a new map of these arrays per blockpool
+ * Create a new block pool report initialized to the given expected size.
*
- * @return a map of ScanInfo arrays per blockpool
+ * @param blockPools initial list of known block pools
*/
- public Map<String, ScanInfo[]> toSortedArrays() {
- Map<String, ScanInfo[]> result =
- new HashMap<String, ScanInfo[]>(this.size());
-
- for (Entry<String, LinkedList<ScanInfo>> entry : this.entrySet()) {
- String bpid = entry.getKey();
- LinkedList<ScanInfo> list = entry.getValue();
-
- // convert list to array
- ScanInfo[] record = list.toArray(new ScanInfo[list.size()]);
+ BlockPoolReport(final Collection<String> blockPools) {
+ this.blockPools = new HashSet<>(blockPools);
+ this.map = ArrayListMultimap.create(blockPools.size(), DEFAULT_MAP_SIZE);
+
+ }
+
+ public void addAll(final String bpid,
+ final Collection<ScanInfo> scanInfos) {
+ this.blockPools.add(bpid);
+ this.map.putAll(bpid, scanInfos);
+ }
+
+ public void sortBlocks() {
+ for (final String bpid : this.map.keySet()) {
+ final List<ScanInfo> list = this.map.get(bpid);
// Sort array based on blockId
- Arrays.sort(record);
- result.put(bpid, record);
+ Collections.sort(list);
}
- return result;
}
- }
+ public Set<String> getBlockPoolIds() {
+ return Collections.unmodifiableSet(this.blockPools);
+ }
+
+ public List<ScanInfo> getScanInfo(final String bpid) {
+ return this.map.get(bpid);
+ }
+
+ public Collection<Map.Entry<String, ScanInfo>> getEntries() {
+ return Collections.unmodifiableCollection(this.map.entries());
+ }
+
+ public void clear() {
+ this.map.clear();
+ this.blockPools.clear();
+ }
+
+ @Override
+ public String toString() {
+ return "BlockPoolReport [blockPools=" + blockPools + ", map=" + map + "]";
+ }
+ }
/**
* Create a new directory scanner, but don't cycle it running yet.
@@ -217,75 +288,58 @@ public class DirectoryScanner implements Runnable {
* @param dataset the dataset to scan
* @param conf the Configuration object
*/
- public DirectoryScanner(DataNode datanode, FsDatasetSpi<?> dataset,
- Configuration conf) {
- this.datanode = datanode;
+ public DirectoryScanner(FsDatasetSpi<?> dataset, Configuration conf) {
this.dataset = dataset;
+ this.stats = new HashMap<>(DEFAULT_MAP_SIZE);
int interval = (int) conf.getTimeDuration(
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY,
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_DEFAULT,
TimeUnit.SECONDS);
- scanPeriodMsecs = interval * MILLIS_PER_SECOND; //msec
- int throttle =
- conf.getInt(
- DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY,
- DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_DEFAULT);
+ scanPeriodMsecs = TimeUnit.SECONDS.toMillis(interval);
- if ((throttle > MILLIS_PER_SECOND) || (throttle <= 0)) {
- if (throttle > MILLIS_PER_SECOND) {
- LOG.error(
- DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY
- + " set to value above 1000 ms/sec. Assuming default value of " +
- DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_DEFAULT);
- } else {
- LOG.error(
- DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY
- + " set to value below 1 ms/sec. Assuming default value of " +
- DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_DEFAULT);
- }
+ int throttle = conf.getInt(
+ DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY,
+ DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_DEFAULT);
- throttleLimitMsPerSec =
+ if (throttle >= TimeUnit.SECONDS.toMillis(1)) {
+ LOG.warn(
+ "{} set to value above 1000 ms/sec. Assuming default value of {}",
+ DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY,
+ DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_DEFAULT);
+ throttle =
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_DEFAULT;
- } else {
- throttleLimitMsPerSec = throttle;
}
- int threads =
+ throttleLimitMsPerSec = throttle;
+
+ int threads =
conf.getInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY,
- DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_DEFAULT);
+ DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_DEFAULT);
- reportCompileThreadPool = Executors.newFixedThreadPool(threads,
- new Daemon.DaemonFactory());
- masterThread = new ScheduledThreadPoolExecutor(1,
- new Daemon.DaemonFactory());
+ reportCompileThreadPool =
+ Executors.newFixedThreadPool(threads, new Daemon.DaemonFactory());
+
+ masterThread =
+ new ScheduledThreadPoolExecutor(1, new Daemon.DaemonFactory());
}
/**
- * Start the scanner. The scanner will run every
+ * Start the scanner. The scanner will run every
* {@link DFSConfigKeys#DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY} seconds.
*/
void start() {
- shouldRun = true;
- long offset = ThreadLocalRandom.current().nextInt(
- (int) (scanPeriodMsecs/MILLIS_PER_SECOND)) * MILLIS_PER_SECOND; //msec
- long firstScanTime = Time.now() + offset;
- String logMsg;
-
- if (throttleLimitMsPerSec < MILLIS_PER_SECOND) {
- logMsg = String.format(START_MESSAGE_WITH_THROTTLE,
- FastDateFormat.getInstance().format(firstScanTime), scanPeriodMsecs,
- throttleLimitMsPerSec);
- } else {
- logMsg = String.format(START_MESSAGE,
- FastDateFormat.getInstance().format(firstScanTime), scanPeriodMsecs);
- }
-
- LOG.info(logMsg);
- masterThread.scheduleAtFixedRate(this, offset, scanPeriodMsecs,
- TimeUnit.MILLISECONDS);
+ shouldRun.set(true);
+ long firstScanTime = ThreadLocalRandom.current().nextLong(scanPeriodMsecs);
+
+ LOG.info(
+ "Periodic Directory Tree Verification scan starting in {}ms with interval of {}ms and throttle limit of {}ms/s",
+ firstScanTime, scanPeriodMsecs, throttleLimitMsPerSec);
+
+ masterThread.scheduleAtFixedRate(this, firstScanTime, scanPeriodMsecs,
+ TimeUnit.MILLISECONDS);
}
-
+
/**
* Return whether the scanner has been started.
*
@@ -293,7 +347,7 @@ public class DirectoryScanner implements Runnable {
*/
@VisibleForTesting
boolean getRunStatus() {
- return shouldRun;
+ return shouldRun.get();
}
/**
@@ -305,67 +359,69 @@ public class DirectoryScanner implements Runnable {
}
/**
- * Main program loop for DirectoryScanner. Runs {@link reconcile()}
- * and handles any exceptions.
+ * Main program loop for DirectoryScanner. Runs {@link reconcile()} and
+ * handles any exceptions.
*/
@Override
public void run() {
+ if (!shouldRun.get()) {
+ // shutdown has been activated
+ LOG.warn(
+ "This cycle terminating immediately because 'shouldRun' has been deactivated");
+ return;
+ }
try {
- if (!shouldRun) {
- //shutdown has been activated
- LOG.warn("this cycle terminating immediately because 'shouldRun' has been deactivated");
- return;
- }
-
- //We're are okay to run - do it
- reconcile();
-
+ reconcile();
} catch (Exception e) {
- //Log and continue - allows Executor to run again next cycle
- LOG.error("Exception during DirectoryScanner execution - will continue next cycle", e);
+ // Log and continue - allows Executor to run again next cycle
+ LOG.error(
+ "Exception during DirectoryScanner execution - will continue next cycle",
+ e);
} catch (Error er) {
- //Non-recoverable error - re-throw after logging the problem
- LOG.error("System Error during DirectoryScanner execution - permanently terminating periodic scanner", er);
+ // Non-recoverable error - re-throw after logging the problem
+ LOG.error(
+ "System Error during DirectoryScanner execution - permanently terminating periodic scanner",
+ er);
throw er;
}
}
-
+
/**
- * Stops the directory scanner. This method will wait for 1 minute for the
+ * Stops the directory scanner. This method will wait for 1 minute for the
* main thread to exit and an additional 1 minute for the report compilation
- * threads to exit. If a thread does not exit in that time period, it is
- * left running, and an error is logged.
+ * threads to exit. If a thread does not exit in that time period, it is left
+ * running, and an error is logged.
*/
void shutdown() {
- if (!shouldRun) {
- LOG.warn("DirectoryScanner: shutdown has been called, but periodic scanner not started");
- } else {
- LOG.warn("DirectoryScanner: shutdown has been called");
+ LOG.info("Shutdown has been called");
+ if (!shouldRun.getAndSet(false)) {
+ LOG.warn("Shutdown has been called, but periodic scanner not started");
+ }
+ if (masterThread != null) {
+ masterThread.shutdown();
}
- shouldRun = false;
- if (masterThread != null) masterThread.shutdown();
-
if (reportCompileThreadPool != null) {
reportCompileThreadPool.shutdownNow();
}
-
if (masterThread != null) {
try {
masterThread.awaitTermination(1, TimeUnit.MINUTES);
} catch (InterruptedException e) {
- LOG.error("interrupted while waiting for masterThread to " +
- "terminate", e);
+ LOG.error(
+ "interrupted while waiting for masterThread to " + "terminate", e);
}
}
if (reportCompileThreadPool != null) {
try {
reportCompileThreadPool.awaitTermination(1, TimeUnit.MINUTES);
} catch (InterruptedException e) {
- LOG.error("interrupted while waiting for reportCompileThreadPool to " +
- "terminate", e);
+ LOG.error("interrupted while waiting for reportCompileThreadPool to "
+ + "terminate", e);
}
}
- if (!retainDiffs) clear();
+ if (!retainDiffs) {
+ clear();
+ }
}
/**
@@ -374,45 +430,54 @@ public class DirectoryScanner implements Runnable {
@VisibleForTesting
public void reconcile() throws IOException {
scan();
- for (Entry<String, LinkedList<ScanInfo>> entry : diffs.entrySet()) {
- String bpid = entry.getKey();
- LinkedList<ScanInfo> diff = entry.getValue();
-
- for (ScanInfo info : diff) {
- dataset.checkAndUpdate(bpid, info);
- }
+
+ for (final Map.Entry<String, ScanInfo> entry : diffs.getEntries()) {
+ dataset.checkAndUpdate(entry.getKey(), entry.getValue());
+ }
+
+ if (!retainDiffs) {
+ clear();
}
- if (!retainDiffs) clear();
}
/**
- * Scan for the differences between disk and in-memory blocks
- * Scan only the "finalized blocks" lists of both disk and memory.
+ * Scan for the differences between disk and in-memory blocks Scan only the
+ * "finalized blocks" lists of both disk and memory.
*/
private void scan() {
+ BlockPoolReport blockPoolReport = new BlockPoolReport();
+
clear();
- Map<String, ScanInfo[]> diskReport = getDiskReport();
+
+ Collection<ScanInfoVolumeReport> volumeReports = getVolumeReports();
+ for (ScanInfoVolumeReport volumeReport : volumeReports) {
+ for (String blockPoolId : volumeReport.getBlockPoolIds()) {
+ List<ScanInfo> scanInfos = volumeReport.getScanInfo(blockPoolId);
+ blockPoolReport.addAll(blockPoolId, scanInfos);
+ }
+ }
+
+ // Pre-sort the reports outside of the lock
+ blockPoolReport.sortBlocks();
// Hold FSDataset lock to prevent further changes to the block map
- try(AutoCloseableLock lock = dataset.acquireDatasetLock()) {
- for (Entry<String, ScanInfo[]> entry : diskReport.entrySet()) {
- String bpid = entry.getKey();
- ScanInfo[] blockpoolReport = entry.getValue();
-
+ try (AutoCloseableLock lock = dataset.acquireDatasetLock()) {
+ for (final String bpid : blockPoolReport.getBlockPoolIds()) {
+ List<ScanInfo> blockpoolReport = blockPoolReport.getScanInfo(bpid);
+
Stats statsRecord = new Stats(bpid);
stats.put(bpid, statsRecord);
- LinkedList<ScanInfo> diffRecord = new LinkedList<ScanInfo>();
- diffs.put(bpid, diffRecord);
-
- statsRecord.totalBlocks = blockpoolReport.length;
+ Collection<ScanInfo> diffRecord = new ArrayList<>();
+
+ statsRecord.totalBlocks = blockpoolReport.size();
final List<ReplicaInfo> bl = dataset.getFinalizedBlocks(bpid);
Collections.sort(bl); // Sort based on blockId
-
+
int d = 0; // index for blockpoolReport
int m = 0; // index for memReprot
- while (m < bl.size() && d < blockpoolReport.length) {
+ while (m < bl.size() && d < blockpoolReport.size()) {
ReplicaInfo memBlock = bl.get(m);
- ScanInfo info = blockpoolReport[d];
+ ScanInfo info = blockpoolReport.get(d);
if (info.getBlockId() < memBlock.getBlockId()) {
if (!dataset.isDeletingBlock(bpid, info.getBlockId())) {
// Block is missing in memory
@@ -424,15 +489,15 @@ public class DirectoryScanner implements Runnable {
}
if (info.getBlockId() > memBlock.getBlockId()) {
// Block is missing on the disk
- addDifference(diffRecord, statsRecord,
- memBlock.getBlockId(), info.getVolume());
+ addDifference(diffRecord, statsRecord, memBlock.getBlockId(),
+ info.getVolume());
m++;
continue;
}
// Block file and/or metadata file exists on the disk
// Block exists in memory
- if (info.getVolume().getStorageType() != StorageType.PROVIDED &&
- info.getBlockFile() == null) {
+ if (info.getVolume().getStorageType() != StorageType.PROVIDED
+ && info.getBlockFile() == null) {
// Block metadata file exits and block file is missing
addDifference(diffRecord, statsRecord, info);
} else if (info.getGenStamp() != memBlock.getGenerationStamp()
@@ -442,16 +507,16 @@ public class DirectoryScanner implements Runnable {
statsRecord.mismatchBlocks++;
addDifference(diffRecord, statsRecord, info);
} else if (memBlock.compareWith(info) != 0) {
- // volumeMap record and on-disk files don't match.
+ // volumeMap record and on-disk files do not match.
statsRecord.duplicateBlocks++;
addDifference(diffRecord, statsRecord, info);
}
d++;
- if (d < blockpoolReport.length) {
- // There may be multiple on-disk records for the same block, don't increment
- // the memory record pointer if so.
- ScanInfo nextInfo = blockpoolReport[d];
+ if (d < blockpoolReport.size()) {
+ // There may be multiple on-disk records for the same block, do not
+ // increment the memory record pointer if so.
+ ScanInfo nextInfo = blockpoolReport.get(d);
if (nextInfo.getBlockId() != info.getBlockId()) {
++m;
}
@@ -461,132 +526,108 @@ public class DirectoryScanner implements Runnable {
}
while (m < bl.size()) {
ReplicaInfo current = bl.get(m++);
- addDifference(diffRecord, statsRecord,
- current.getBlockId(), current.getVolume());
+ addDifference(diffRecord, statsRecord, current.getBlockId(),
+ current.getVolume());
}
- while (d < blockpoolReport.length) {
- if (!dataset.isDeletingBlock(bpid, blockpoolReport[d].getBlockId())) {
+ while (d < blockpoolReport.size()) {
+ if (!dataset.isDeletingBlock(bpid,
+ blockpoolReport.get(d).getBlockId())) {
statsRecord.missingMemoryBlocks++;
- addDifference(diffRecord, statsRecord, blockpoolReport[d]);
+ addDifference(diffRecord, statsRecord, blockpoolReport.get(d));
}
d++;
}
- LOG.info(statsRecord.toString());
- } //end for
- } //end synchronized
+ diffs.addAll(bpid, diffRecord);
+ LOG.info("Scan Results: {}", statsRecord);
+ }
+ }
}
/**
* Add the ScanInfo object to the list of differences and adjust the stats
- * accordingly. This method is called when a block is found on the disk,
- * but the in-memory block is missing or does not match the block on the disk.
+ * accordingly. This method is called when a block is found on the disk, but
+ * the in-memory block is missing or does not match the block on the disk.
*
- * @param diffRecord the list to which to add the info
+ * @param diffRecord the collection to which to add the info
* @param statsRecord the stats to update
* @param info the differing info
*/
- private void addDifference(LinkedList<ScanInfo> diffRecord,
- Stats statsRecord, ScanInfo info) {
+ private void addDifference(Collection<ScanInfo> diffRecord, Stats statsRecord,
+ ScanInfo info) {
statsRecord.missingMetaFile += info.getMetaFile() == null ? 1 : 0;
statsRecord.missingBlockFile += info.getBlockFile() == null ? 1 : 0;
diffRecord.add(info);
}
/**
- * Add a new ScanInfo object to the list of differences and adjust the stats
- * accordingly. This method is called when a block is not found on the disk.
+ * Add a new ScanInfo object to the collection of differences and adjust the
+ * stats accordingly. This method is called when a block is not found on the
+ * disk.
*
- * @param diffRecord the list to which to add the info
+ * @param diffRecord the collection to which to add the info
* @param statsRecord the stats to update
* @param blockId the id of the missing block
* @param vol the volume that contains the missing block
*/
- private void addDifference(LinkedList<ScanInfo> diffRecord,
- Stats statsRecord, long blockId,
- FsVolumeSpi vol) {
+ private void addDifference(Collection<ScanInfo> diffRecord, Stats statsRecord,
+ long blockId, FsVolumeSpi vol) {
statsRecord.missingBlockFile++;
statsRecord.missingMetaFile++;
diffRecord.add(new ScanInfo(blockId, null, null, vol));
}
/**
- * Get the lists of blocks on the disks in the dataset, sorted by blockId.
- * The returned map contains one entry per blockpool, keyed by the blockpool
- * ID.
- *
- * @return a map of sorted arrays of block information
+ * Get the lists of blocks on the disks in the data set.
*/
@VisibleForTesting
- public Map<String, ScanInfo[]> getDiskReport() {
- ScanInfoPerBlockPool list = new ScanInfoPerBlockPool();
- ScanInfoPerBlockPool[] dirReports = null;
+ public Collection<ScanInfoVolumeReport> getVolumeReports() {
+ List<ScanInfoVolumeReport> volReports = new ArrayList<>();
+ List<Future<ScanInfoVolumeReport>> compilersInProgress = new ArrayList<>();
+
// First get list of data directories
try (FsDatasetSpi.FsVolumeReferences volumes =
dataset.getFsVolumeReferences()) {
- // Use an array since the threads may return out of order and
- // compilersInProgress#keySet may return out of order as well.
- dirReports = new ScanInfoPerBlockPool[volumes.size()];
-
- Map<Integer, Future<ScanInfoPerBlockPool>> compilersInProgress =
- new HashMap<Integer, Future<ScanInfoPerBlockPool>>();
-
- for (int i = 0; i < volumes.size(); i++) {
- if (volumes.get(i).getStorageType() == StorageType.PROVIDED) {
- // Disable scanning PROVIDED volumes to keep overhead low
- continue;
+ for (final FsVolumeSpi volume : volumes) {
+ // Disable scanning PROVIDED volumes to keep overhead low
+ if (volume.getStorageType() != StorageType.PROVIDED) {
+ ReportCompiler reportCompiler = new ReportCompiler(volume);
+ Future<ScanInfoVolumeReport> result =
+ reportCompileThreadPool.submit(reportCompiler);
+ compilersInProgress.add(result);
}
- ReportCompiler reportCompiler =
- new ReportCompiler(datanode, volumes.get(i));
- Future<ScanInfoPerBlockPool> result =
- reportCompileThreadPool.submit(reportCompiler);
- compilersInProgress.put(i, result);
}
- for (Entry<Integer, Future<ScanInfoPerBlockPool>> report :
- compilersInProgress.entrySet()) {
- Integer index = report.getKey();
+ for (Future<ScanInfoVolumeReport> future : compilersInProgress) {
try {
- dirReports[index] = report.getValue().get();
-
- // If our compiler threads were interrupted, give up on this run
- if (dirReports[index] == null) {
- dirReports = null;
+ final ScanInfoVolumeReport result = future.get();
+ if (!CollectionUtils.addIgnoreNull(volReports, result)) {
+ // This compiler thread were interrupted, give up on this run
+ volReports.clear();
break;
}
} catch (Exception ex) {
- FsVolumeSpi fsVolumeSpi = volumes.get(index);
- LOG.error("Error compiling report for the volume, StorageId: "
- + fsVolumeSpi.getStorageID(), ex);
- // Continue scanning the other volumes
+ LOG.warn("Error compiling report. Continuing.", ex);
}
}
} catch (IOException e) {
LOG.error("Unexpected IOException by closing FsVolumeReference", e);
}
- if (dirReports != null) {
- // Compile consolidated report for all the volumes
- for (ScanInfoPerBlockPool report : dirReports) {
- if(report != null){
- list.addAll(report);
- }
- }
- }
- return list.toSortedArrays();
+
+ return volReports;
}
/**
* The ReportCompiler class encapsulates the process of searching a datanode's
- * disks for block information. It operates by performing a DFS of the
- * volume to discover block information.
+ * disks for block information. It operates by performing a DFS of the volume
+ * to discover block information.
*
* When the ReportCompiler discovers block information, it create a new
- * ScanInfo object for it and adds that object to its report list. The report
+ * ScanInfo object for it and adds that object to its report list. The report
* list is returned by the {@link #call()} method.
*/
- public class ReportCompiler implements Callable<ScanInfoPerBlockPool> {
+ public class ReportCompiler implements Callable<ScanInfoVolumeReport> {
private final FsVolumeSpi volume;
- private final DataNode datanode;
// Variable for tracking time spent running for throttling purposes
private final StopWatch throttleTimer = new StopWatch();
// Variable for tracking time spent running and waiting for testing
@@ -594,13 +635,11 @@ public class DirectoryScanner implements Runnable {
private final StopWatch perfTimer = new StopWatch();
/**
- * Create a report compiler for the given volume on the given datanode.
+ * Create a report compiler for the given volume.
*
- * @param datanode the target datanode
* @param volume the target volume
*/
- public ReportCompiler(DataNode datanode, FsVolumeSpi volume) {
- this.datanode = datanode;
+ public ReportCompiler(FsVolumeSpi volume) {
this.volume = volume;
}
@@ -608,12 +647,13 @@ public class DirectoryScanner implements Runnable {
* Run this report compiler thread.
*
* @return the block info report list
- * @throws IOException if the block pool isn't found
+ * @throws IOException if the block pool is not found
*/
@Override
- public ScanInfoPerBlockPool call() throws IOException {
+ public ScanInfoVolumeReport call() throws IOException {
String[] bpList = volume.getBlockPoolList();
- ScanInfoPerBlockPool result = new ScanInfoPerBlockPool(bpList.length);
+ ScanInfoVolumeReport result =
+ new ScanInfoVolumeReport(volume, Arrays.asList(bpList));
perfTimer.start();
throttleTimer.start();
for (String bpid : bpList) {
@@ -623,33 +663,45 @@ public class DirectoryScanner implements Runnable {
throttleTimer.reset().start();
try {
- result.put(bpid, volume.compileReport(bpid, report, this));
+ // ScanInfos are added directly to 'report' list
+ volume.compileReport(bpid, report, this);
+ result.addAll(bpid, report);
} catch (InterruptedException ex) {
// Exit quickly and flag the scanner to do the same
result = null;
break;
}
}
+ LOG.trace("Scanner volume report: {}", result);
return result;
}
/**
- * Called by the thread before each potential disk scan so that a pause
- * can be optionally inserted to limit the number of scans per second.
- * The limit is controlled by
+ * Called by the thread before each potential disk scan so that a pause can
+ * be optionally inserted to limit the number of scans per second. The limit
+ * is controlled by
* {@link DFSConfigKeys#DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY}.
*/
public void throttle() throws InterruptedException {
accumulateTimeRunning();
- if ((throttleLimitMsPerSec < 1000) &&
- (throttleTimer.now(TimeUnit.MILLISECONDS) > throttleLimitMsPerSec)) {
-
- Thread.sleep(MILLIS_PER_SECOND - throttleLimitMsPerSec);
- throttleTimer.reset().start();
+ if (throttleLimitMsPerSec > 0L) {
+ final long runningTime = throttleTimer.now(TimeUnit.MILLISECONDS);
+ if (runningTime >= throttleLimitMsPerSec) {
+ final long sleepTime;
+ if (runningTime >= 1000L) {
+ LOG.warn("Unable to throttle within the second. Blocking for 1s.");
+ sleepTime = 1000L;
+ } else {
+ // Sleep for the expected time plus any time processing ran over
+ final long overTime = runningTime - throttleLimitMsPerSec;
+ sleepTime = (1000L - throttleLimitMsPerSec) + overTime;
+ }
+ Thread.sleep(sleepTime);
+ throttleTimer.reset().start();
+ }
+ accumulateTimeWaiting();
}
-
- accumulateTimeWaiting();
}
/**
@@ -679,4 +731,5 @@ public class DirectoryScanner implements Runnable {
|| name.startsWith(Block.BLOCK_FILE_PREFIX);
}
}
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1dc0adfa/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
index 312bc86..e29a147 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
@@ -33,6 +33,7 @@ import java.net.URI;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
@@ -44,26 +45,23 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.io.FileUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.DF;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
-import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.DataNodeVolumeMetrics;
-import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner.ReportCompiler;
+import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.DataNodeVolumeMetrics;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi.FsVolumeReferences;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
@@ -73,14 +71,17 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.LazyPersistTestCase;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.util.Time;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
- * Tests {@link DirectoryScanner} handling of differences
- * between blocks on the disk and block in memory.
+ * Tests {@link DirectoryScanner} handling of differences between blocks on the
+ * disk and block in memory.
*/
public class TestDirectoryScanner {
private static final Logger LOG =
@@ -102,7 +103,7 @@ public class TestDirectoryScanner {
CONF.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, 1);
CONF.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
CONF.setLong(DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY,
- getMemlockLimit(Long.MAX_VALUE));
+ getMemlockLimit(Long.MAX_VALUE));
}
@Before
@@ -110,21 +111,20 @@ public class TestDirectoryScanner {
LazyPersistTestCase.initCacheManipulator();
}
- /** create a file with a length of <code>fileLen</code> */
- private List<LocatedBlock> createFile(String fileNamePrefix,
- long fileLen,
- boolean isLazyPersist) throws IOException {
+ /** create a file with a length of <code>fileLen</code>. */
+ private List<LocatedBlock> createFile(String fileNamePrefix, long fileLen,
+ boolean isLazyPersist) throws IOException {
FileSystem fs = cluster.getFileSystem();
Path filePath = new Path("/" + fileNamePrefix + ".dat");
- DFSTestUtil.createFile(
- fs, filePath, isLazyPersist, 1024, fileLen,
+ DFSTestUtil.createFile(fs, filePath, isLazyPersist, 1024, fileLen,
BLOCK_LENGTH, (short) 1, r.nextLong(), false);
- return client.getLocatedBlocks(filePath.toString(), 0, fileLen).getLocatedBlocks();
+ return client.getLocatedBlocks(filePath.toString(), 0, fileLen)
+ .getLocatedBlocks();
}
- /** Truncate a block file */
+ /** Truncate a block file. */
private long truncateBlockFile() throws IOException {
- try(AutoCloseableLock lock = fds.acquireDatasetLock()) {
+ try (AutoCloseableLock lock = fds.acquireDatasetLock()) {
for (ReplicaInfo b : FsDatasetTestUtil.getReplicas(fds, bpid)) {
File f = new File(b.getBlockURI());
File mf = new File(b.getMetadataURI());
@@ -149,7 +149,7 @@ public class TestDirectoryScanner {
/** Delete a block file */
private long deleteBlockFile() {
- try(AutoCloseableLock lock = fds.acquireDatasetLock()) {
+ try (AutoCloseableLock lock = fds.acquireDatasetLock()) {
for (ReplicaInfo b : FsDatasetTestUtil.getReplicas(fds, bpid)) {
File f = new File(b.getBlockURI());
File mf = new File(b.getMetadataURI());
@@ -165,7 +165,7 @@ public class TestDirectoryScanner {
/** Delete block meta file */
private long deleteMetaFile() {
- try(AutoCloseableLock lock = fds.acquireDatasetLock()) {
+ try (AutoCloseableLock lock = fds.acquireDatasetLock()) {
for (ReplicaInfo b : FsDatasetTestUtil.getReplicas(fds, bpid)) {
// Delete a metadata file
if (b.metadataExists() && b.deleteMetadata()) {
@@ -179,11 +179,12 @@ public class TestDirectoryScanner {
/**
* Duplicate the given block on all volumes.
+ *
* @param blockId
* @throws IOException
*/
private void duplicateBlock(long blockId) throws IOException {
- try(AutoCloseableLock lock = fds.acquireDatasetLock()) {
+ try (AutoCloseableLock lock = fds.acquireDatasetLock()) {
ReplicaInfo b = FsDatasetTestUtil.fetchReplicaInfo(fds, bpid, blockId);
try (FsDatasetSpi.FsVolumeReferences volumes =
fds.getFsVolumeReferences()) {
@@ -199,16 +200,14 @@ public class TestDirectoryScanner {
URI destRoot = v.getStorageLocation().getUri();
String relativeBlockPath =
- sourceRoot.relativize(sourceBlock.toURI())
- .getPath();
+ sourceRoot.relativize(sourceBlock.toURI()).getPath();
String relativeMetaPath =
- sourceRoot.relativize(sourceMeta.toURI())
- .getPath();
+ sourceRoot.relativize(sourceMeta.toURI()).getPath();
- File destBlock = new File(new File(destRoot).toString(),
- relativeBlockPath);
- File destMeta = new File(new File(destRoot).toString(),
- relativeMetaPath);
+ File destBlock =
+ new File(new File(destRoot).toString(), relativeBlockPath);
+ File destMeta =
+ new File(new File(destRoot).toString(), relativeMetaPath);
destBlock.getParentFile().mkdirs();
FileUtils.copyFile(sourceBlock, destBlock);
@@ -223,7 +222,7 @@ public class TestDirectoryScanner {
}
}
- /** Get a random blockId that is not used already */
+ /** Get a random blockId that is not used already. */
private long getFreeBlockId() {
long id = rand.nextLong();
while (true) {
@@ -244,14 +243,15 @@ public class TestDirectoryScanner {
+ Block.METADATA_EXTENSION;
}
- /** Create a block file in a random volume*/
+ /** Create a block file in a random volume. */
private long createBlockFile() throws IOException {
long id = getFreeBlockId();
- try (FsDatasetSpi.FsVolumeReferences volumes = fds.getFsVolumeReferences()) {
+ try (
+ FsDatasetSpi.FsVolumeReferences volumes = fds.getFsVolumeReferences()) {
int numVolumes = volumes.size();
int index = rand.nextInt(numVolumes - 1);
- File finalizedDir = ((FsVolumeImpl) volumes.get(index))
- .getFinalizedDir(bpid);
+ File finalizedDir =
+ ((FsVolumeImpl) volumes.get(index)).getFinalizedDir(bpid);
File file = new File(finalizedDir, getBlockFile(id));
if (file.createNewFile()) {
LOG.info("Created block file " + file.getName());
@@ -260,14 +260,14 @@ public class TestDirectoryScanner {
return id;
}
- /** Create a metafile in a random volume*/
+ /** Create a metafile in a random volume */
private long createMetaFile() throws IOException {
long id = getFreeBlockId();
try (FsDatasetSpi.FsVolumeReferences refs = fds.getFsVolumeReferences()) {
int numVolumes = refs.size();
int index = rand.nextInt(numVolumes - 1);
- File finalizedDir = ((FsVolumeImpl) refs.get(index))
- .getFinalizedDir(bpid);
+ File finalizedDir =
+ ((FsVolumeImpl) refs.get(index)).getFinalizedDir(bpid);
File file = new File(finalizedDir, getMetaFile(id));
if (file.createNewFile()) {
LOG.info("Created metafile " + file.getName());
@@ -276,7 +276,7 @@ public class TestDirectoryScanner {
return id;
}
- /** Create block file and corresponding metafile in a rondom volume */
+ /** Create block file and corresponding metafile in a rondom volume. */
private long createBlockMetaFile() throws IOException {
long id = getFreeBlockId();
@@ -318,7 +318,7 @@ public class TestDirectoryScanner {
long missingBlockFile, long missingMemoryBlocks, long mismatchBlocks)
throws IOException, InterruptedException, TimeoutException {
scan(totalBlocks, diffsize, missingMetaFile, missingBlockFile,
- missingMemoryBlocks, mismatchBlocks, 0);
+ missingMemoryBlocks, mismatchBlocks, 0);
}
private void scan(long totalBlocks, int diffsize, long missingMetaFile,
@@ -332,22 +332,22 @@ public class TestDirectoryScanner {
verifyStats(totalBlocks, diffsize, missingMetaFile, missingBlockFile,
missingMemoryBlocks, mismatchBlocks, duplicateBlocks);
} catch (AssertionError ex) {
+ LOG.warn("Assertion Error", ex);
return false;
}
return true;
- }, 50, 2000);
+ }, 100, 2000);
}
private void verifyStats(long totalBlocks, int diffsize, long missingMetaFile,
long missingBlockFile, long missingMemoryBlocks, long mismatchBlocks,
long duplicateBlocks) {
- assertTrue(scanner.diffs.containsKey(bpid));
- LinkedList<FsVolumeSpi.ScanInfo> diff = scanner.diffs.get(bpid);
- assertTrue(scanner.stats.containsKey(bpid));
- DirectoryScanner.Stats stats = scanner.stats.get(bpid);
-
+ Collection<FsVolumeSpi.ScanInfo> diff = scanner.diffs.getScanInfo(bpid);
assertEquals(diffsize, diff.size());
+
+ DirectoryScanner.Stats stats = scanner.stats.get(bpid);
+ assertNotNull(stats);
assertEquals(totalBlocks, stats.totalBlocks);
assertEquals(missingMetaFile, stats.missingMetaFile);
assertEquals(missingBlockFile, stats.missingBlockFile);
@@ -356,20 +356,18 @@ public class TestDirectoryScanner {
assertEquals(duplicateBlocks, stats.duplicateBlocks);
}
- @Test (timeout=300000)
+ @Test(timeout = 300000)
public void testRetainBlockOnPersistentStorage() throws Exception {
- cluster = new MiniDFSCluster
- .Builder(CONF)
- .storageTypes(new StorageType[] { StorageType.RAM_DISK, StorageType.DEFAULT })
- .numDataNodes(1)
- .build();
+ cluster = new MiniDFSCluster.Builder(CONF)
+ .storageTypes(
+ new StorageType[] { StorageType.RAM_DISK, StorageType.DEFAULT })
+ .numDataNodes(1).build();
try {
cluster.waitActive();
- DataNode dataNode = cluster.getDataNodes().get(0);
bpid = cluster.getNamesystem().getBlockPoolId();
fds = DataNodeTestUtils.getFSDataset(cluster.getDataNodes().get(0));
client = cluster.getFileSystem().getClient();
- scanner = new DirectoryScanner(dataNode, fds, CONF);
+ scanner = new DirectoryScanner(fds, CONF);
scanner.setRetainDiffs(true);
FsDatasetTestUtil.stopLazyWriter(cluster.getDataNodes().get(0));
@@ -397,24 +395,22 @@ public class TestDirectoryScanner {
}
}
- @Test (timeout=300000)
+ @Test(timeout = 300000)
public void testDeleteBlockOnTransientStorage() throws Exception {
- cluster = new MiniDFSCluster
- .Builder(CONF)
- .storageTypes(new StorageType[] { StorageType.RAM_DISK, StorageType.DEFAULT })
- .numDataNodes(1)
- .build();
+ cluster = new MiniDFSCluster.Builder(CONF)
+ .storageTypes(
+ new StorageType[] { StorageType.RAM_DISK, StorageType.DEFAULT })
+ .numDataNodes(1).build();
try {
cluster.waitActive();
bpid = cluster.getNamesystem().getBlockPoolId();
- DataNode dataNode = cluster.getDataNodes().get(0);
fds = DataNodeTestUtils.getFSDataset(cluster.getDataNodes().get(0));
client = cluster.getFileSystem().getClient();
- scanner = new DirectoryScanner(dataNode, fds, CONF);
+ scanner = new DirectoryScanner(fds, CONF);
scanner.setRetainDiffs(true);
FsDatasetTestUtil.stopLazyWriter(cluster.getDataNodes().get(0));
- // Create a file file on RAM_DISK
+ // Create a file on RAM_DISK
List<LocatedBlock> blocks =
createFile(GenericTestUtils.getMethodName(), BLOCK_LENGTH, true);
@@ -440,14 +436,14 @@ public class TestDirectoryScanner {
}
}
- @Test (timeout=600000)
+ @Test(timeout = 600000)
public void testDirectoryScanner() throws Exception {
// Run the test with and without parallel scanning
for (int parallelism = 1; parallelism < 3; parallelism++) {
runTest(parallelism);
}
}
-
+
public void runTest(int parallelism) throws Exception {
cluster = new MiniDFSCluster.Builder(CONF).build();
try {
@@ -456,9 +452,9 @@ public class TestDirectoryScanner {
fds = DataNodeTestUtils.getFSDataset(cluster.getDataNodes().get(0));
client = cluster.getFileSystem().getClient();
CONF.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY,
- parallelism);
- DataNode dataNode = cluster.getDataNodes().get(0);
- scanner = new DirectoryScanner(dataNode, fds, CONF);
+ parallelism);
+
+ scanner = new DirectoryScanner(fds, CONF);
scanner.setRetainDiffs(true);
// Add files with 100 blocks
@@ -492,7 +488,7 @@ public class TestDirectoryScanner {
// Test5: A metafile exists for which there is no block file and
// a block in memory
blockId = createMetaFile();
- scan(totalBlocks+1, 1, 0, 1, 1, 0);
+ scan(totalBlocks + 1, 1, 0, 1, 1, 0);
File metafile = new File(getMetaFile(blockId));
assertTrue(!metafile.exists());
scan(totalBlocks, 0, 0, 0, 0, 0);
@@ -521,7 +517,7 @@ public class TestDirectoryScanner {
scan(totalBlocks, 0, 0, 0, 0, 0);
// Test9: create a bunch of blocks files
- for (int i = 0; i < 10 ; i++) {
+ for (int i = 0; i < 10; i++) {
blockId = createBlockFile();
}
totalBlocks += 10;
@@ -529,14 +525,14 @@ public class TestDirectoryScanner {
scan(totalBlocks, 0, 0, 0, 0, 0);
// Test10: create a bunch of metafiles
- for (int i = 0; i < 10 ; i++) {
+ for (int i = 0; i < 10; i++) {
blockId = createMetaFile();
}
- scan(totalBlocks+10, 10, 0, 10, 10, 0);
+ scan(totalBlocks + 10, 10, 0, 10, 10, 0);
scan(totalBlocks, 0, 0, 0, 0, 0);
// Test11: create a bunch block files and meta files
- for (int i = 0; i < 10 ; i++) {
+ for (int i = 0; i < 10; i++) {
blockId = createBlockMetaFile();
}
totalBlocks += 10;
@@ -544,7 +540,7 @@ public class TestDirectoryScanner {
scan(totalBlocks, 0, 0, 0, 0, 0);
// Test12: truncate block files to test block length mismatch
- for (int i = 0; i < 10 ; i++) {
+ for (int i = 0; i < 10; i++) {
truncateBlockFile();
}
scan(totalBlocks, 10, 0, 0, 0, 10);
@@ -557,9 +553,9 @@ public class TestDirectoryScanner {
deleteMetaFile();
deleteBlockFile();
truncateBlockFile();
- scan(totalBlocks+3, 6, 2, 2, 3, 2);
- scan(totalBlocks+1, 0, 0, 0, 0, 0);
-
+ scan(totalBlocks + 3, 6, 2, 2, 3, 2);
+ scan(totalBlocks + 1, 0, 0, 0, 0, 0);
+
// Test14: make sure no throttling is happening
assertTrue("Throttle appears to be engaged",
scanner.timeWaitingMs.get() < 10L);
@@ -567,10 +563,11 @@ public class TestDirectoryScanner {
scanner.timeRunningMs.get() > 0L);
// Test15: validate clean shutdown of DirectoryScanner
- ////assertTrue(scanner.getRunStatus()); //assumes "real" FSDataset, not sim
+ //// assertTrue(scanner.getRunStatus()); //assumes "real" FSDataset, not
+ // sim
scanner.shutdown();
assertFalse(scanner.getRunStatus());
-
+
} finally {
if (scanner != null) {
scanner.shutdown();
@@ -582,17 +579,17 @@ public class TestDirectoryScanner {
/**
* Test that the timeslice throttle limits the report compiler thread's
- * execution time correctly. We test by scanning a large block pool and
+ * execution time correctly. We test by scanning a large block pool and
* comparing the time spent waiting to the time spent running.
*
- * The block pool has to be large, or the ratio will be off. The throttle
- * allows the report compiler thread to finish its current cycle when
- * blocking it, so the ratio will always be a little lower than expected.
- * The smaller the block pool, the further off the ratio will be.
+ * The block pool has to be large, or the ratio will be off. The throttle
+ * allows the report compiler thread to finish its current cycle when blocking
+ * it, so the ratio will always be a little lower than expected. The smaller
+ * the block pool, the further off the ratio will be.
*
* @throws Exception thrown on unexpected failure
*/
- @Test (timeout=600000)
+ @Test(timeout = 600000)
public void testThrottling() throws Exception {
Configuration conf = new Configuration(CONF);
@@ -611,10 +608,9 @@ public class TestDirectoryScanner {
conf.setInt(
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY,
100);
- DataNode dataNode = cluster.getDataNodes().get(0);
- final int maxBlocksPerFile = (int) DFSConfigKeys
- .DFS_NAMENODE_MAX_BLOCKS_PER_FILE_DEFAULT;
+ final int maxBlocksPerFile =
+ (int) DFSConfigKeys.DFS_NAMENODE_MAX_BLOCKS_PER_FILE_DEFAULT;
int numBlocksToCreate = blocks;
while (numBlocksToCreate > 0) {
final int toCreate = Math.min(maxBlocksPerFile, numBlocksToCreate);
@@ -627,7 +623,7 @@ public class TestDirectoryScanner {
int retries = maxRetries;
while ((retries > 0) && ((ratio < 7f) || (ratio > 10f))) {
- scanner = new DirectoryScanner(dataNode, fds, conf);
+ scanner = new DirectoryScanner(fds, conf);
ratio = runThrottleTest(blocks);
retries -= 1;
}
@@ -645,7 +641,7 @@ public class TestDirectoryScanner {
retries = maxRetries;
while ((retries > 0) && ((ratio < 2.75f) || (ratio > 4.5f))) {
- scanner = new DirectoryScanner(dataNode, fds, conf);
+ scanner = new DirectoryScanner(fds, conf);
ratio = runThrottleTest(blocks);
retries -= 1;
}
@@ -664,7 +660,7 @@ public class TestDirectoryScanner {
retries = maxRetries;
while ((retries > 0) && ((ratio < 7f) || (ratio > 10f))) {
- scanner = new DirectoryScanner(dataNode, fds, conf);
+ scanner = new DirectoryScanner(fds, conf);
ratio = runThrottleTest(blocks);
retries -= 1;
}
@@ -675,7 +671,7 @@ public class TestDirectoryScanner {
assertTrue("Throttle is too permissive", ratio >= 7f);
// Test with no limit
- scanner = new DirectoryScanner(dataNode, fds, CONF);
+ scanner = new DirectoryScanner(fds, CONF);
scanner.setRetainDiffs(true);
scan(blocks, 0, 0, 0, 0, 0);
scanner.shutdown();
@@ -686,7 +682,7 @@ public class TestDirectoryScanner {
assertTrue("Report complier threads logged no execution time",
scanner.timeRunningMs.get() > 0L);
- // Test with a 1ms limit. This also tests whether the scanner can be
+ // Test with a 1ms limit. This also tests whether the scanner can be
// shutdown cleanly in mid stride.
conf.setInt(
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY,
@@ -698,7 +694,7 @@ public class TestDirectoryScanner {
try {
while ((retries > 0) && (ratio < 10)) {
- scanner = new DirectoryScanner(dataNode, fds, conf);
+ scanner = new DirectoryScanner(fds, conf);
scanner.setRetainDiffs(true);
final AtomicLong nowMs = new AtomicLong();
@@ -728,7 +724,7 @@ public class TestDirectoryScanner {
}
ratio =
- (float)scanner.timeWaitingMs.get() / scanner.timeRunningMs.get();
+ (float) scanner.timeWaitingMs.get() / scanner.timeRunningMs.get();
retries -= 1;
}
} finally {
@@ -737,8 +733,7 @@ public class TestDirectoryScanner {
// We just want to test that it waits a lot, but it also runs some
LOG.info("RATIO: " + ratio);
- assertTrue("Throttle is too permissive",
- ratio > 10);
+ assertTrue("Throttle is too permissive", ratio > 8);
assertTrue("Report complier threads logged no execution time",
scanner.timeRunningMs.get() > 0L);
@@ -746,7 +741,7 @@ public class TestDirectoryScanner {
conf.setInt(
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY,
0);
- scanner = new DirectoryScanner(dataNode, fds, conf);
+ scanner = new DirectoryScanner(fds, conf);
scanner.setRetainDiffs(true);
scan(blocks, 0, 0, 0, 0, 0);
scanner.shutdown();
@@ -761,7 +756,7 @@ public class TestDirectoryScanner {
conf.setInt(
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY,
1000);
- scanner = new DirectoryScanner(dataNode, fds, conf);
+ scanner = new DirectoryScanner(fds, conf);
scanner.setRetainDiffs(true);
scan(blocks, 0, 0, 0, 0, 0);
scanner.shutdown();
@@ -777,9 +772,8 @@ public class TestDirectoryScanner {
conf.setInt(
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY,
10);
- conf.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY,
- 1);
- scanner = new DirectoryScanner(dataNode, fds, conf);
+ conf.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY, 1);
+ scanner = new DirectoryScanner(fds, conf);
scanner.setRetainDiffs(true);
scanner.start();
@@ -805,7 +799,7 @@ public class TestDirectoryScanner {
scanner.shutdown();
assertFalse(scanner.getRunStatus());
- return (float)scanner.timeWaitingMs.get() / scanner.timeRunningMs.get();
+ return (float) scanner.timeWaitingMs.get() / scanner.timeRunningMs.get();
}
private void verifyAddition(long blockId, long genStamp, long size) {
@@ -836,7 +830,7 @@ public class TestDirectoryScanner {
assertNotNull(memBlock);
assertEquals(genStamp, memBlock.getGenerationStamp());
}
-
+
private void verifyStorageType(long blockId, boolean expectTransient) {
final ReplicaInfo memBlock;
memBlock = FsDatasetTestUtil.fetchReplicaInfo(fds, bpid, blockId);
@@ -859,7 +853,7 @@ public class TestDirectoryScanner {
public long getAvailable() throws IOException {
return 0;
}
-
+
public File getFinalizedDir(String bpid) throws IOException {
return new File("/base/current/" + bpid + "/finalized");
}
@@ -898,10 +892,11 @@ public class TestDirectoryScanner {
@Override
public BlockIterator loadBlockIterator(String bpid, String name)
- throws IOException {
+ throws IOException {
throw new UnsupportedOperationException();
}
+ @SuppressWarnings("rawtypes")
@Override
public FsDatasetSpi getDataset() {
throw new UnsupportedOperationException();
@@ -923,8 +918,8 @@ public class TestDirectoryScanner {
}
@Override
- public byte[] loadLastPartialChunkChecksum(
- File blockFile, File metaFile) throws IOException {
+ public byte[] loadLastPartialChunkChecksum(File blockFile, File metaFile)
+ throws IOException {
return null;
}
@@ -945,7 +940,6 @@ public class TestDirectoryScanner {
return null;
}
-
@Override
public VolumeCheckResult check(VolumeCheckContext context)
throws Exception {
@@ -954,11 +948,11 @@ public class TestDirectoryScanner {
}
private final static TestFsVolumeSpi TEST_VOLUME = new TestFsVolumeSpi();
-
+
private final static String BPID_1 = "BP-783049782-127.0.0.1-1370971773491";
-
+
private final static String BPID_2 = "BP-367845636-127.0.0.1-5895645674231";
-
+
void testScanInfoObject(long blockId, File blockFile, File metaFile)
throws Exception {
FsVolumeSpi.ScanInfo scanInfo =
@@ -978,7 +972,7 @@ public class TestDirectoryScanner {
}
assertEquals(TEST_VOLUME, scanInfo.getVolume());
}
-
+
void testScanInfoObject(long blockId) throws Exception {
FsVolumeSpi.ScanInfo scanInfo =
new FsVolumeSpi.ScanInfo(blockId, null, null, null);
@@ -987,7 +981,7 @@ public class TestDirectoryScanner {
assertNull(scanInfo.getMetaFile());
}
- @Test(timeout=120000)
+ @Test(timeout = 120000)
public void TestScanInfo() throws Exception {
testScanInfoObject(123,
new File(TEST_VOLUME.getFinalizedDir(BPID_1).getAbsolutePath(),
@@ -998,13 +992,10 @@ public class TestDirectoryScanner {
new File(TEST_VOLUME.getFinalizedDir(BPID_1).getAbsolutePath(),
"blk_123"),
null);
- testScanInfoObject(523,
- null,
+ testScanInfoObject(523, null,
new File(TEST_VOLUME.getFinalizedDir(BPID_1).getAbsolutePath(),
"blk_123__1009.meta"));
- testScanInfoObject(789,
- null,
- null);
+ testScanInfoObject(789, null, null);
testScanInfoObject(456);
testScanInfoObject(123,
new File(TEST_VOLUME.getFinalizedDir(BPID_2).getAbsolutePath(),
@@ -1027,7 +1018,6 @@ public class TestDirectoryScanner {
fds = DataNodeTestUtils.getFSDataset(cluster.getDataNodes().get(0));
client = cluster.getFileSystem().getClient();
CONF.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY, 1);
- DataNode dataNode = cluster.getDataNodes().get(0);
// Add files with 2 blocks
createFile(GenericTestUtils.getMethodName(), BLOCK_LENGTH * 2, false);
@@ -1047,7 +1037,7 @@ public class TestDirectoryScanner {
FsDatasetSpi<? extends FsVolumeSpi> spyFds = Mockito.spy(fds);
Mockito.doReturn(volReferences).when(spyFds).getFsVolumeReferences();
- scanner = new DirectoryScanner(dataNode, spyFds, CONF);
+ scanner = new DirectoryScanner(spyFds, CONF);
scanner.setRetainDiffs(true);
scanner.reconcile();
} finally {
@@ -1061,28 +1051,27 @@ public class TestDirectoryScanner {
@Test
public void testDirectoryScannerInFederatedCluster() throws Exception {
- //Create Federated cluster with two nameservices and one DN
+ // Create Federated cluster with two nameservices and one DN
try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(CONF)
.nnTopology(MiniDFSNNTopology.simpleHAFederatedTopology(2))
.numDataNodes(1).build()) {
cluster.waitActive();
cluster.transitionToActive(1);
cluster.transitionToActive(3);
- DataNode dataNode = cluster.getDataNodes().get(0);
fds = DataNodeTestUtils.getFSDataset(cluster.getDataNodes().get(0));
- //Create one block in first nameservice
+ // Create one block in first nameservice
FileSystem fs = cluster.getFileSystem(1);
int bp1Files = 1;
writeFile(fs, bp1Files);
- //Create two blocks in second nameservice
+ // Create two blocks in second nameservice
FileSystem fs2 = cluster.getFileSystem(3);
int bp2Files = 2;
writeFile(fs2, bp2Files);
- //Call the Directory scanner
- scanner = new DirectoryScanner(dataNode, fds, CONF);
+ // Call the Directory scanner
+ scanner = new DirectoryScanner(fds, CONF);
scanner.setRetainDiffs(true);
scanner.reconcile();
- //Check blocks in corresponding BP
+ // Check blocks in corresponding BP
GenericTestUtils.waitFor(() -> {
try {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1dc0adfa/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestProvidedImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestProvidedImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestProvidedImpl.java
index 422acc3..a48e2f8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestProvidedImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestProvidedImpl.java
@@ -34,6 +34,7 @@ import java.io.Writer;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -66,6 +67,7 @@ import org.apache.hadoop.hdfs.server.datanode.DNConf;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataStorage;
import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner;
+import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner.ScanInfoVolumeReport;
import org.apache.hadoop.hdfs.server.datanode.FinalizedProvidedReplica;
import org.apache.hadoop.hdfs.server.datanode.ProvidedReplica;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
@@ -73,9 +75,9 @@ import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.hdfs.server.datanode.TestProvidedReplicaImpl;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi.FsVolumeReferences;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi.BlockIterator;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi.FsVolumeReferences;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.util.StringUtils;
@@ -183,7 +185,6 @@ public class TestProvidedImpl {
public static class TestFileRegionBlockAliasMap
extends BlockAliasMap<FileRegion> {
- private Configuration conf;
private int minId;
private int numBlocks;
private Iterator<FileRegion> suppliedIterator;
@@ -592,11 +593,13 @@ public class TestProvidedImpl {
@Test
public void testScannerWithProvidedVolumes() throws Exception {
- DirectoryScanner scanner = new DirectoryScanner(datanode, dataset, conf);
- Map<String, FsVolumeSpi.ScanInfo[]> report = scanner.getDiskReport();
+ DirectoryScanner scanner = new DirectoryScanner(dataset, conf);
+ Collection<ScanInfoVolumeReport> reports = scanner.getVolumeReports();
// no blocks should be reported for the Provided volume as long as
// the directoryScanner is disabled.
- assertEquals(0, report.get(BLOCK_POOL_IDS[CHOSEN_BP_ID]).length);
+ for (ScanInfoVolumeReport report : reports) {
+ assertEquals(0, report.getScanInfo(BLOCK_POOL_IDS[CHOSEN_BP_ID]).size());
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1dc0adfa/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListCorruptFileBlocks.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListCorruptFileBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListCorruptFileBlocks.java
index e1c8ae3..db12146 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListCorruptFileBlocks.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListCorruptFileBlocks.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.File;
@@ -63,14 +64,19 @@ public class TestListCorruptFileBlocks {
@Test (timeout=300000)
public void testListCorruptFilesCorruptedBlock() throws Exception {
MiniDFSCluster cluster = null;
- Random random = new Random();
-
+
try {
Configuration conf = new HdfsConfiguration();
- conf.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY, 1); // datanode scans directories
- conf.setInt(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 3 * 1000); // datanode sends block reports
+
+ // datanode scans directories
+ conf.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY, 1);
+
+ // datanode sends block reports
+ conf.setInt(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 3 * 1000);
+
// Set short retry timeouts so this test runs faster
conf.setInt(HdfsClientConfigKeys.Retry.WINDOW_BASE_KEY, 10);
+
cluster = new MiniDFSCluster.Builder(conf).build();
FileSystem fs = cluster.getFileSystem();
@@ -84,8 +90,8 @@ public class TestListCorruptFileBlocks {
final NameNode namenode = cluster.getNameNode();
Collection<FSNamesystem.CorruptFileBlockInfo> badFiles = namenode.
getNamesystem().listCorruptFileBlocks("/", null);
- assertTrue("Namenode has " + badFiles.size()
- + " corrupt files. Expecting None.", badFiles.size() == 0);
+ assertEquals("Namenode has " + badFiles.size()
+ + " corrupt files. Expecting None.", 0, badFiles.size());
// Now deliberately corrupt one block
String bpid = cluster.getNamesystem().getBlockPoolId();
@@ -101,7 +107,7 @@ public class TestListCorruptFileBlocks {
long position = channel.size() - 2;
int length = 2;
byte[] buffer = new byte[length];
- random.nextBytes(buffer);
+ new Random(13L).nextBytes(buffer);
channel.write(ByteBuffer.wrap(buffer), position);
file.close();
LOG.info("Deliberately corrupting file " + metaFile.getName() +
@@ -134,7 +140,6 @@ public class TestListCorruptFileBlocks {
@Test (timeout=300000)
public void testListCorruptFileBlocksInSafeMode() throws Exception {
MiniDFSCluster cluster = null;
- Random random = new Random();
try {
Configuration conf = new HdfsConfiguration();
@@ -164,8 +169,8 @@ public class TestListCorruptFileBlocks {
// fetch bad file list from namenode. There should be none.
Collection<FSNamesystem.CorruptFileBlockInfo> badFiles =
cluster.getNameNode().getNamesystem().listCorruptFileBlocks("/", null);
- assertTrue("Namenode has " + badFiles.size()
- + " corrupt files. Expecting None.", badFiles.size() == 0);
+ assertEquals("Namenode has " + badFiles.size()
+ + " corrupt files. Expecting None.", 0, badFiles.size());
// Now deliberately corrupt one block
File storageDir = cluster.getInstanceStorageDir(0, 0);
@@ -181,7 +186,7 @@ public class TestListCorruptFileBlocks {
long position = channel.size() - 2;
int length = 2;
byte[] buffer = new byte[length];
- random.nextBytes(buffer);
+ new Random(13L).nextBytes(buffer);
channel.write(ByteBuffer.wrap(buffer), position);
file.close();
LOG.info("Deliberately corrupting file " + metaFile.getName() +
@@ -318,9 +323,9 @@ public class TestListCorruptFileBlocks {
}
// Validate we get all the corrupt files
LOG.info("Namenode has bad files. " + numCorrupt);
- assertTrue(numCorrupt == 3);
- // test the paging here
+ assertEquals(3, numCorrupt);
+ // test the paging here
FSNamesystem.CorruptFileBlockInfo[] cfb = corruptFileBlocks
.toArray(new FSNamesystem.CorruptFileBlockInfo[0]);
// now get the 2nd and 3rd file that is corrupt
@@ -331,7 +336,7 @@ public class TestListCorruptFileBlocks {
FSNamesystem.CorruptFileBlockInfo[] ncfb = nextCorruptFileBlocks
.toArray(new FSNamesystem.CorruptFileBlockInfo[0]);
numCorrupt = nextCorruptFileBlocks.size();
- assertTrue(numCorrupt == 2);
+ assertEquals(2, numCorrupt);
assertTrue(ncfb[0].block.getBlockName()
.equalsIgnoreCase(cfb[1].block.getBlockName()));
@@ -339,14 +344,14 @@ public class TestListCorruptFileBlocks {
namenode.getNamesystem()
.listCorruptFileBlocks("/corruptData", cookie);
numCorrupt = corruptFileBlocks.size();
- assertTrue(numCorrupt == 0);
+ assertEquals(0, numCorrupt);
// Do a listing on a dir which doesn't have any corrupt blocks and
// validate
util.createFiles(fs, "/goodData");
corruptFileBlocks =
namenode.getNamesystem().listCorruptFileBlocks("/goodData", null);
numCorrupt = corruptFileBlocks.size();
- assertTrue(numCorrupt == 0);
+ assertEquals(0, numCorrupt);
util.cleanup(fs, "/corruptData");
util.cleanup(fs, "/goodData");
} finally {
@@ -390,7 +395,7 @@ public class TestListCorruptFileBlocks {
RemoteIterator<Path> corruptFileBlocks =
dfs.listCorruptFileBlocks(new Path("/corruptData"));
int numCorrupt = countPaths(corruptFileBlocks);
- assertTrue(numCorrupt == 0);
+ assertEquals(0, numCorrupt);
// delete the blocks
String bpid = cluster.getNamesystem().getBlockPoolId();
// For loop through number of datadirectories per datanode (2)
@@ -426,7 +431,7 @@ public class TestListCorruptFileBlocks {
}
// Validate we get all the corrupt files
LOG.info("Namenode has bad files. " + numCorrupt);
- assertTrue(numCorrupt == 3);
+ assertEquals(3, numCorrupt);
util.cleanup(fs, "/corruptData");
util.cleanup(fs, "/goodData");
@@ -465,8 +470,9 @@ public class TestListCorruptFileBlocks {
final NameNode namenode = cluster.getNameNode();
Collection<FSNamesystem.CorruptFileBlockInfo> badFiles = namenode.
getNamesystem().listCorruptFileBlocks("/srcdat2", null);
- assertTrue("Namenode has " + badFiles.size() + " corrupt files. Expecting none.",
- badFiles.size() == 0);
+ assertEquals(
+ "Namenode has " + badFiles.size() + " corrupt files. Expecting none.",
+ 0, badFiles.size());
// Now deliberately blocks from all files
final String bpid = cluster.getNamesystem().getBlockPoolId();
@@ -555,7 +561,7 @@ public class TestListCorruptFileBlocks {
RemoteIterator<Path> corruptFileBlocks = dfs
.listCorruptFileBlocks(new Path("corruptData"));
int numCorrupt = countPaths(corruptFileBlocks);
- assertTrue(numCorrupt == 0);
+ assertEquals(0, numCorrupt);
// delete the blocks
String bpid = cluster.getNamesystem().getBlockPoolId();
@@ -589,7 +595,7 @@ public class TestListCorruptFileBlocks {
}
// Validate we get all the corrupt files
LOG.info("Namenode has bad files. " + numCorrupt);
- assertTrue("Failed to get corrupt files!", numCorrupt == 3);
+ assertEquals("Failed to get corrupt files!", 3, numCorrupt);
util.cleanup(fs, "corruptData");
} finally {
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org