You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by xy...@apache.org on 2016/12/20 22:01:36 UTC
hadoop git commit: HDFS-11182. Update DataNode to use
DatasetVolumeChecker. Contributed by Arpit Agarwal.
Repository: hadoop
Updated Branches:
refs/heads/trunk 5daa8d863 -> f678080db
HDFS-11182. Update DataNode to use DatasetVolumeChecker. Contributed by Arpit Agarwal.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f678080d
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f678080d
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f678080d
Branch: refs/heads/trunk
Commit: f678080dbd25a218e0406463a3c3a1fc03680702
Parents: 5daa8d8
Author: Xiaoyu Yao <xy...@apache.org>
Authored: Tue Dec 20 13:53:07 2016 -0800
Committer: Xiaoyu Yao <xy...@apache.org>
Committed: Tue Dec 20 13:53:32 2016 -0800
----------------------------------------------------------------------
.../hadoop/hdfs/server/datanode/DataNode.java | 130 ++++++++-----------
.../datanode/checker/DatasetVolumeChecker.java | 116 +++++++++++------
.../server/datanode/fsdataset/FsDatasetSpi.java | 3 +-
.../datanode/fsdataset/impl/FsDatasetImpl.java | 5 +-
.../datanode/fsdataset/impl/FsVolumeImpl.java | 7 -
.../datanode/fsdataset/impl/FsVolumeList.java | 25 +---
.../blockmanagement/TestBlockStatsMXBean.java | 4 +
.../server/datanode/SimulatedFSDataset.java | 18 ++-
.../datanode/TestDataNodeHotSwapVolumes.java | 3 +
.../datanode/TestDataNodeVolumeFailure.java | 3 +
.../TestDataNodeVolumeFailureReporting.java | 3 +
.../TestDataNodeVolumeFailureToleration.java | 3 +
.../hdfs/server/datanode/TestDiskError.java | 24 ++--
.../checker/TestDatasetVolumeChecker.java | 17 ++-
.../TestDatasetVolumeCheckerFailures.java | 45 ++++---
.../extdataset/ExternalDatasetImpl.java | 3 +-
.../fsdataset/impl/TestFsDatasetImpl.java | 84 ++----------
.../fsdataset/impl/TestFsVolumeList.java | 37 ------
18 files changed, 233 insertions(+), 297 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f678080d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index 794b1ad..a94c4b1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -74,6 +74,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -85,7 +86,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -108,6 +108,7 @@ import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.HDFSPolicyProvider;
import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.server.datanode.checker.DatasetVolumeChecker;
import org.apache.hadoop.hdfs.server.datanode.checker.StorageLocationChecker;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.hdfs.client.BlockReportOptions;
@@ -369,11 +370,7 @@ public class DataNode extends ReconfigurableBase
SaslDataTransferClient saslClient;
SaslDataTransferServer saslServer;
private ObjectName dataNodeInfoBeanName;
- private Thread checkDiskErrorThread = null;
- protected final int checkDiskErrorInterval;
- private boolean checkDiskErrorFlag = false;
- private Object checkDiskErrorMutex = new Object();
- private long lastDiskErrorCheck;
+ private volatile long lastDiskErrorCheck;
private String supergroup;
private boolean isPermissionEnabled;
private String dnUserName = null;
@@ -389,6 +386,7 @@ public class DataNode extends ReconfigurableBase
@Nullable
private final StorageLocationChecker storageLocationChecker;
+ private final DatasetVolumeChecker volumeChecker;
private final SocketFactory socketFactory;
@@ -407,7 +405,7 @@ public class DataNode extends ReconfigurableBase
*/
@VisibleForTesting
@InterfaceAudience.LimitedPrivate("HDFS")
- DataNode(final Configuration conf) {
+ DataNode(final Configuration conf) throws DiskErrorException {
super(conf);
this.tracer = createTracer(conf);
this.tracerConfigurationManager =
@@ -420,11 +418,10 @@ public class DataNode extends ReconfigurableBase
this.connectToDnViaHostname = false;
this.blockScanner = new BlockScanner(this, this.getConf());
this.pipelineSupportECN = false;
- this.checkDiskErrorInterval =
- ThreadLocalRandom.current().nextInt(5000, (int) (5000 * 1.25));
this.socketFactory = NetUtils.getDefaultSocketFactory(conf);
initOOBTimeout();
storageLocationChecker = null;
+ volumeChecker = new DatasetVolumeChecker(conf, new Timer());
}
/**
@@ -464,8 +461,7 @@ public class DataNode extends ReconfigurableBase
",hdfs-" +
conf.get("hadoop.hdfs.configuration.version", "UNSPECIFIED");
- this.checkDiskErrorInterval =
- ThreadLocalRandom.current().nextInt(5000, (int) (5000 * 1.25));
+ this.volumeChecker = new DatasetVolumeChecker(conf, new Timer());
// Determine whether we should try to pass file descriptors to clients.
if (conf.getBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY,
@@ -1918,11 +1914,6 @@ public class DataNode extends ReconfigurableBase
}
}
- // Interrupt the checkDiskErrorThread and terminate it.
- if(this.checkDiskErrorThread != null) {
- this.checkDiskErrorThread.interrupt();
- }
-
// Record the time of initial notification
long timeNotified = Time.monotonicNow();
@@ -1944,6 +1935,8 @@ public class DataNode extends ReconfigurableBase
}
}
+ volumeChecker.shutdownAndWait(1, TimeUnit.SECONDS);
+
if (storageLocationChecker != null) {
storageLocationChecker.shutdownAndWait(1, TimeUnit.SECONDS);
}
@@ -2051,16 +2044,19 @@ public class DataNode extends ReconfigurableBase
* Check if there is a disk failure asynchronously and if so, handle the error
*/
public void checkDiskErrorAsync() {
- synchronized(checkDiskErrorMutex) {
- checkDiskErrorFlag = true;
- if(checkDiskErrorThread == null) {
- startCheckDiskErrorThread();
- checkDiskErrorThread.start();
- LOG.info("Starting CheckDiskError Thread");
- }
- }
+ volumeChecker.checkAllVolumesAsync(
+ data, (healthyVolumes, failedVolumes) -> {
+ if (failedVolumes.size() > 0) {
+ LOG.warn("checkDiskErrorAsync callback got {} failed volumes: {}",
+ failedVolumes.size(), failedVolumes);
+ } else {
+ LOG.debug("checkDiskErrorAsync: no volume failures detected");
+ }
+ lastDiskErrorCheck = Time.monotonicNow();
+ handleVolumeFailures(failedVolumes);
+ });
}
-
+
private void handleDiskError(String errMsgr) {
final boolean hasEnoughResources = data.hasEnoughResource();
LOG.warn("DataNode.handleDiskError: Keep Running: " + hasEnoughResources);
@@ -3208,11 +3204,40 @@ public class DataNode extends ReconfigurableBase
}
/**
- * Check the disk error
+ * Check the disk error synchronously.
*/
- private void checkDiskError() {
- Set<StorageLocation> unhealthyLocations = data.checkDataDir();
- if (unhealthyLocations != null && !unhealthyLocations.isEmpty()) {
+ @VisibleForTesting
+ public void checkDiskError() throws IOException {
+ Set<FsVolumeSpi> unhealthyVolumes;
+ try {
+ unhealthyVolumes = volumeChecker.checkAllVolumes(data);
+ lastDiskErrorCheck = Time.monotonicNow();
+ } catch (InterruptedException e) {
+ LOG.error("Interruped while running disk check", e);
+ throw new IOException("Interrupted while running disk check", e);
+ }
+
+ if (unhealthyVolumes.size() > 0) {
+ LOG.warn("checkDiskError got {} failed volumes - {}",
+ unhealthyVolumes.size(), unhealthyVolumes);
+ handleVolumeFailures(unhealthyVolumes);
+ } else {
+ LOG.debug("checkDiskError encountered no failures");
+ }
+ }
+
+ private void handleVolumeFailures(Set<FsVolumeSpi> unhealthyVolumes) {
+ data.handleVolumeFailures(unhealthyVolumes);
+ Set<StorageLocation> unhealthyLocations = new HashSet<>(
+ unhealthyVolumes.size());
+
+ if (!unhealthyVolumes.isEmpty()) {
+ StringBuilder sb = new StringBuilder("DataNode failed volumes:");
+ for (FsVolumeSpi vol : unhealthyVolumes) {
+ unhealthyLocations.add(vol.getStorageLocation());
+ sb.append(vol.getStorageLocation()).append(";");
+ }
+
try {
// Remove all unhealthy volumes from DataNode.
removeVolumes(unhealthyLocations, false);
@@ -3220,56 +3245,13 @@ public class DataNode extends ReconfigurableBase
LOG.warn("Error occurred when removing unhealthy storage dirs: "
+ e.getMessage(), e);
}
- StringBuilder sb = new StringBuilder("DataNode failed volumes:");
- for (StorageLocation location : unhealthyLocations) {
- sb.append(location + ";");
- }
+ LOG.info(sb.toString());
handleDiskError(sb.toString());
}
}
- /**
- * Starts a new thread which will check for disk error check request
- * every 5 sec
- */
- private void startCheckDiskErrorThread() {
- checkDiskErrorThread = new Thread(new Runnable() {
- @Override
- public void run() {
- while(shouldRun) {
- boolean tempFlag ;
- synchronized(checkDiskErrorMutex) {
- tempFlag = checkDiskErrorFlag;
- checkDiskErrorFlag = false;
- }
- if(tempFlag) {
- try {
- checkDiskError();
- } catch (Exception e) {
- LOG.warn("Unexpected exception occurred while checking disk error " + e);
- checkDiskErrorThread = null;
- return;
- }
- synchronized(checkDiskErrorMutex) {
- lastDiskErrorCheck = Time.monotonicNow();
- }
- }
- try {
- Thread.sleep(checkDiskErrorInterval);
- } catch (InterruptedException e) {
- LOG.debug("InterruptedException in check disk error thread", e);
- checkDiskErrorThread = null;
- return;
- }
- }
- }
- });
- }
-
public long getLastDiskErrorCheck() {
- synchronized(checkDiskErrorMutex) {
- return lastDiskErrorCheck;
- }
+ return lastDiskErrorCheck;
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f678080d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/DatasetVolumeChecker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/DatasetVolumeChecker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/DatasetVolumeChecker.java
index 8a57812..ba09d23 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/DatasetVolumeChecker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/DatasetVolumeChecker.java
@@ -27,7 +27,6 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
@@ -161,37 +160,54 @@ public class DatasetVolumeChecker {
* @param dataset - FsDatasetSpi to be checked.
* @return set of failed volumes.
*/
- public Set<StorageLocation> checkAllVolumes(
+ public Set<FsVolumeSpi> checkAllVolumes(
final FsDatasetSpi<? extends FsVolumeSpi> dataset)
throws InterruptedException {
-
- if (timer.monotonicNow() - lastAllVolumesCheck < minDiskCheckGapMs) {
+ final long gap = timer.monotonicNow() - lastAllVolumesCheck;
+ if (gap < minDiskCheckGapMs) {
numSkippedChecks.incrementAndGet();
+ LOG.trace(
+ "Skipped checking all volumes, time since last check {} is less " +
+ "than the minimum gap between checks ({} ms).",
+ gap, minDiskCheckGapMs);
return Collections.emptySet();
}
- lastAllVolumesCheck = timer.monotonicNow();
- final Set<StorageLocation> healthyVolumes = new HashSet<>();
- final Set<StorageLocation> failedVolumes = new HashSet<>();
- final Set<StorageLocation> allVolumes = new HashSet<>();
-
final FsDatasetSpi.FsVolumeReferences references =
dataset.getFsVolumeReferences();
- final CountDownLatch resultsLatch = new CountDownLatch(references.size());
+
+ if (references.size() == 0) {
+ LOG.warn("checkAllVolumesAsync - no volumes can be referenced");
+ return Collections.emptySet();
+ }
+
+ lastAllVolumesCheck = timer.monotonicNow();
+ final Set<FsVolumeSpi> healthyVolumes = new HashSet<>();
+ final Set<FsVolumeSpi> failedVolumes = new HashSet<>();
+ final Set<FsVolumeSpi> allVolumes = new HashSet<>();
+
+ final AtomicLong numVolumes = new AtomicLong(references.size());
+ final CountDownLatch latch = new CountDownLatch(1);
for (int i = 0; i < references.size(); ++i) {
final FsVolumeReference reference = references.getReference(i);
- allVolumes.add(reference.getVolume().getStorageLocation());
+ allVolumes.add(reference.getVolume());
ListenableFuture<VolumeCheckResult> future =
delegateChecker.schedule(reference.getVolume(), IGNORED_CONTEXT);
LOG.info("Scheduled health check for volume {}", reference.getVolume());
Futures.addCallback(future, new ResultHandler(
- reference, healthyVolumes, failedVolumes, resultsLatch, null));
+ reference, healthyVolumes, failedVolumes, numVolumes, new Callback() {
+ @Override
+ public void call(Set<FsVolumeSpi> ignored1,
+ Set<FsVolumeSpi> ignored2) {
+ latch.countDown();
+ }
+ }));
}
// Wait until our timeout elapses, after which we give up on
// the remaining volumes.
- if (!resultsLatch.await(maxAllowedTimeForCheckMs, TimeUnit.MILLISECONDS)) {
+ if (!latch.await(maxAllowedTimeForCheckMs, TimeUnit.MILLISECONDS)) {
LOG.warn("checkAllVolumes timed out after {} ms" +
maxAllowedTimeForCheckMs);
}
@@ -225,18 +241,28 @@ public class DatasetVolumeChecker {
public boolean checkAllVolumesAsync(
final FsDatasetSpi<? extends FsVolumeSpi> dataset,
Callback callback) {
-
- if (timer.monotonicNow() - lastAllVolumesCheck < minDiskCheckGapMs) {
+ final long gap = timer.monotonicNow() - lastAllVolumesCheck;
+ if (gap < minDiskCheckGapMs) {
numSkippedChecks.incrementAndGet();
+ LOG.trace(
+ "Skipped checking all volumes, time since last check {} is less " +
+ "than the minimum gap between checks ({} ms).",
+ gap, minDiskCheckGapMs);
return false;
}
- lastAllVolumesCheck = timer.monotonicNow();
- final Set<StorageLocation> healthyVolumes = new HashSet<>();
- final Set<StorageLocation> failedVolumes = new HashSet<>();
final FsDatasetSpi.FsVolumeReferences references =
dataset.getFsVolumeReferences();
- final CountDownLatch latch = new CountDownLatch(references.size());
+
+ if (references.size() == 0) {
+ LOG.warn("checkAllVolumesAsync - no volumes can be referenced");
+ return false;
+ }
+
+ lastAllVolumesCheck = timer.monotonicNow();
+ final Set<FsVolumeSpi> healthyVolumes = new HashSet<>();
+ final Set<FsVolumeSpi> failedVolumes = new HashSet<>();
+ final AtomicLong numVolumes = new AtomicLong(references.size());
LOG.info("Checking {} volumes", references.size());
for (int i = 0; i < references.size(); ++i) {
@@ -245,7 +271,7 @@ public class DatasetVolumeChecker {
ListenableFuture<VolumeCheckResult> future =
delegateChecker.schedule(reference.getVolume(), IGNORED_CONTEXT);
Futures.addCallback(future, new ResultHandler(
- reference, healthyVolumes, failedVolumes, latch, callback));
+ reference, healthyVolumes, failedVolumes, numVolumes, callback));
}
numAsyncDatasetChecks.incrementAndGet();
return true;
@@ -260,8 +286,8 @@ public class DatasetVolumeChecker {
* @param healthyVolumes set of volumes that passed disk checks.
* @param failedVolumes set of volumes that failed disk checks.
*/
- void call(Set<StorageLocation> healthyVolumes,
- Set<StorageLocation> failedVolumes);
+ void call(Set<FsVolumeSpi> healthyVolumes,
+ Set<FsVolumeSpi> failedVolumes);
}
/**
@@ -273,8 +299,10 @@ public class DatasetVolumeChecker {
*
* @param volume the volume that is to be checked.
* @param callback callback to be invoked when the volume check completes.
+ * @return true if the check was scheduled and the callback will be invoked.
+ * false otherwise.
*/
- public void checkVolume(
+ public boolean checkVolume(
final FsVolumeSpi volume,
Callback callback) {
FsVolumeReference volumeReference;
@@ -283,14 +311,15 @@ public class DatasetVolumeChecker {
} catch (ClosedChannelException e) {
// The volume has already been closed.
callback.call(new HashSet<>(), new HashSet<>());
- return;
+ return false;
}
ListenableFuture<VolumeCheckResult> future =
delegateChecker.schedule(volume, IGNORED_CONTEXT);
numVolumeChecks.incrementAndGet();
Futures.addCallback(future, new ResultHandler(
volumeReference, new HashSet<>(), new HashSet<>(),
- new CountDownLatch(1), callback));
+ new AtomicLong(1), callback));
+ return true;
}
/**
@@ -299,26 +328,35 @@ public class DatasetVolumeChecker {
private class ResultHandler
implements FutureCallback<VolumeCheckResult> {
private final FsVolumeReference reference;
- private final Set<StorageLocation> failedVolumes;
- private final Set<StorageLocation> healthyVolumes;
- private final CountDownLatch latch;
- private final AtomicLong numVolumes;
+ private final Set<FsVolumeSpi> failedVolumes;
+ private final Set<FsVolumeSpi> healthyVolumes;
+ private final AtomicLong volumeCounter;
@Nullable
private final Callback callback;
+ /**
+ *
+ * @param reference FsVolumeReference to be released when the check is
+ * complete.
+ * @param healthyVolumes set of healthy volumes. If the disk check is
+ * successful, add the volume here.
+ * @param failedVolumes set of failed volumes. If the disk check fails,
+ * add the volume here.
+ * @param semaphore semaphore used to trigger callback invocation.
+ * @param callback invoked when the semaphore can be successfully acquired.
+ */
ResultHandler(FsVolumeReference reference,
- Set<StorageLocation> healthyVolumes,
- Set<StorageLocation> failedVolumes,
- CountDownLatch latch,
+ Set<FsVolumeSpi> healthyVolumes,
+ Set<FsVolumeSpi> failedVolumes,
+ AtomicLong volumeCounter,
@Nullable Callback callback) {
Preconditions.checkState(reference != null);
this.reference = reference;
this.healthyVolumes = healthyVolumes;
this.failedVolumes = failedVolumes;
- this.latch = latch;
+ this.volumeCounter = volumeCounter;
this.callback = callback;
- numVolumes = new AtomicLong(latch.getCount());
}
@Override
@@ -355,13 +393,13 @@ public class DatasetVolumeChecker {
private void markHealthy() {
synchronized (DatasetVolumeChecker.this) {
- healthyVolumes.add(reference.getVolume().getStorageLocation());
+ healthyVolumes.add(reference.getVolume());
}
}
private void markFailed() {
synchronized (DatasetVolumeChecker.this) {
- failedVolumes.add(reference.getVolume().getStorageLocation());
+ failedVolumes.add(reference.getVolume());
}
}
@@ -372,10 +410,8 @@ public class DatasetVolumeChecker {
private void invokeCallback() {
try {
- latch.countDown();
-
- if (numVolumes.decrementAndGet() == 0 &&
- callback != null) {
+ final long remaining = volumeCounter.decrementAndGet();
+ if (callback != null && remaining == 0) {
callback.call(healthyVolumes, failedVolumes);
}
} catch(Exception e) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f678080d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
index 30f045f..9e979f7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
@@ -494,8 +494,9 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
/**
* Check if all the data directories are healthy
* @return A set of unhealthy data directories.
+ * @param failedVolumes
*/
- Set<StorageLocation> checkDataDir();
+ void handleVolumeFailures(Set<FsVolumeSpi> failedVolumes);
/**
* Shutdown the FSDataset
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f678080d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index 35561cd..0d5a12c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -2067,10 +2067,11 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
* if some volumes failed - the caller must emove all the blocks that belong
* to these failed volumes.
* @return the failed volumes. Returns null if no volume failed.
+ * @param failedVolumes
*/
@Override // FsDatasetSpi
- public Set<StorageLocation> checkDataDir() {
- return volumes.checkDirs();
+ public void handleVolumeFailures(Set<FsVolumeSpi> failedVolumes) {
+ volumes.handleVolumeFailures(failedVolumes);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f678080d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
index e28ee27..753c083 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
@@ -959,13 +959,6 @@ public class FsVolumeImpl implements FsVolumeSpi {
return cacheExecutor;
}
- void checkDirs() throws DiskErrorException {
- // TODO:FEDERATION valid synchronization
- for(BlockPoolSlice s : bpSlices.values()) {
- s.checkDirs();
- }
- }
-
@Override
public VolumeCheckResult check(VolumeCheckContext ignored)
throws DiskErrorException {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f678080d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
index cf9c319..64921d7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
@@ -23,7 +23,6 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -44,7 +43,6 @@ import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.AutoCloseableLock;
-import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.util.Time;
class FsVolumeList {
@@ -235,23 +233,14 @@ class FsVolumeList {
* Use {@link checkDirsLock} to allow only one instance of checkDirs() call.
*
* @return list of all the failed volumes.
+ * @param failedVolumes
*/
- Set<StorageLocation> checkDirs() {
+ void handleVolumeFailures(Set<FsVolumeSpi> failedVolumes) {
try (AutoCloseableLock lock = checkDirsLock.acquire()) {
- Set<StorageLocation> failedLocations = null;
- // Make a copy of volumes for performing modification
- final List<FsVolumeImpl> volumeList = getVolumes();
- for(Iterator<FsVolumeImpl> i = volumeList.iterator(); i.hasNext(); ) {
- final FsVolumeImpl fsv = i.next();
+ for(FsVolumeSpi vol : failedVolumes) {
+ FsVolumeImpl fsv = (FsVolumeImpl) vol;
try (FsVolumeReference ref = fsv.obtainReference()) {
- fsv.checkDirs();
- } catch (DiskErrorException e) {
- FsDatasetImpl.LOG.warn("Removing failed volume " + fsv + ": ", e);
- if (failedLocations == null) {
- failedLocations = new HashSet<>(1);
- }
- failedLocations.add(fsv.getStorageLocation());
addVolumeFailureInfo(fsv);
removeVolume(fsv);
} catch (ClosedChannelException e) {
@@ -262,13 +251,7 @@ class FsVolumeList {
}
}
- if (failedLocations != null && failedLocations.size() > 0) {
- FsDatasetImpl.LOG.warn("Completed checkDirs. Found " +
- failedLocations.size() + " failure volumes.");
- }
-
waitVolumeRemoved(5000, checkDirsLockCondition);
- return failedLocations;
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f678080d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockStatsMXBean.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockStatsMXBean.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockStatsMXBean.java
index 476565dc..b7583c4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockStatsMXBean.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockStatsMXBean.java
@@ -30,9 +30,11 @@ import java.net.URL;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
@@ -52,6 +54,8 @@ public class TestBlockStatsMXBean {
@Before
public void setup() throws IOException {
HdfsConfiguration conf = new HdfsConfiguration();
+ conf.setTimeDuration(DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY,
+ 0, TimeUnit.MILLISECONDS);
cluster = null;
StorageType[][] types = new StorageType[6][];
for (int i=0; i<3; i++) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f678080d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
index 484fbe4..8472eca 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
@@ -489,7 +489,17 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
@Override
public FsVolumeReference obtainReference() throws ClosedChannelException {
- return null;
+ return new FsVolumeReference() {
+ @Override
+ public void close() throws IOException {
+ // no-op.
+ }
+
+ @Override
+ public FsVolumeSpi getVolume() {
+ return SimulatedVolume.this;
+ }
+ };
}
@Override
@@ -1078,9 +1088,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
}
@Override
- public Set<StorageLocation> checkDataDir() {
- // nothing to check for simulated data set
- return null;
+ public void handleVolumeFailures(Set<FsVolumeSpi> failedVolumes) {
}
@Override // FsDatasetSpi
@@ -1349,7 +1357,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
@Override
public FsVolumeReferences getFsVolumeReferences() {
- throw new UnsupportedOperationException();
+ return new FsVolumeReferences(Collections.singletonList(volume));
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f678080d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java
index 5607ccc..e31e783 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java
@@ -62,6 +62,7 @@ import java.util.Map;
import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.commons.logging.Log;
@@ -113,6 +114,8 @@ public class TestDataNodeHotSwapVolumes {
1000);
/* Allow 1 volume failure */
conf.setInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 1);
+ conf.setTimeDuration(DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY,
+ 0, TimeUnit.MILLISECONDS);
MiniDFSNNTopology nnTopology =
MiniDFSNNTopology.simpleFederatedTopology(numNameNodes);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f678080d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
index 8db7658..06e2871 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
@@ -33,6 +33,7 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.conf.Configuration;
@@ -118,6 +119,8 @@ public class TestDataNodeVolumeFailure {
// Allow a single volume failure (there are two volumes)
conf.setInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 1);
conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 30);
+ conf.setTimeDuration(DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY,
+ 0, TimeUnit.MILLISECONDS);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(dn_num).build();
cluster.waitActive();
fs = cluster.getFileSystem();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f678080d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureReporting.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureReporting.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureReporting.java
index aa9b7aa..3d37b10 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureReporting.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureReporting.java
@@ -30,6 +30,7 @@ import java.io.File;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
+import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -659,6 +660,8 @@ public class TestDataNodeVolumeFailureReporting {
conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 1000);
conf.setInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY,
failedVolumesTolerated);
+ conf.setTimeDuration(DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY,
+ 0, TimeUnit.MILLISECONDS);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDataNodes)
.storagesPerDatanode(storagesPerDatanode).build();
cluster.waitActive();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f678080d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureToleration.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureToleration.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureToleration.java
index 5ff7d9b..de50ccb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureToleration.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureToleration.java
@@ -24,6 +24,7 @@ import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
+import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -144,6 +145,8 @@ public class TestDataNodeVolumeFailureToleration {
// Bring up two additional datanodes that need both of their volumes
// functioning in order to stay up.
conf.setInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 0);
+ conf.setTimeDuration(DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY,
+ 0, TimeUnit.MILLISECONDS);
cluster.startDataNodes(conf, 2, true, null, null);
cluster.waitActive();
final DatanodeManager dm = cluster.getNamesystem().getBlockManager(
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f678080d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
index 56dee43..cd86720 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
@@ -26,7 +26,9 @@ import java.io.IOException;
import java.io.RandomAccessFile;
import java.net.InetSocketAddress;
import java.net.Socket;
+import java.util.concurrent.TimeUnit;
+import com.google.common.base.Supplier;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
@@ -49,8 +51,8 @@ import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
+import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.DataChecksum;
-import org.apache.hadoop.util.Time;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -69,6 +71,9 @@ public class TestDiskError {
public void setUp() throws Exception {
conf = new HdfsConfiguration();
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 512L);
+ conf.setTimeDuration(
+ DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY,
+ 0, TimeUnit.MILLISECONDS);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
fs = cluster.getFileSystem();
@@ -213,19 +218,22 @@ public class TestDiskError {
* Before refactoring the code the above function was not getting called
* @throws IOException, InterruptedException
*/
- @Test
- public void testcheckDiskError() throws IOException, InterruptedException {
+ @Test(timeout=60000)
+ public void testcheckDiskError() throws Exception {
if(cluster.getDataNodes().size() <= 0) {
cluster.startDataNodes(conf, 1, true, null, null);
cluster.waitActive();
}
DataNode dataNode = cluster.getDataNodes().get(0);
- long slackTime = dataNode.checkDiskErrorInterval/2;
//checking for disk error
- dataNode.checkDiskErrorAsync();
- Thread.sleep(dataNode.checkDiskErrorInterval);
- long lastDiskErrorCheck = dataNode.getLastDiskErrorCheck();
- assertTrue("Disk Error check is not performed within " + dataNode.checkDiskErrorInterval + " ms", ((Time.monotonicNow()-lastDiskErrorCheck) < (dataNode.checkDiskErrorInterval + slackTime)));
+ final long lastCheckTimestamp = dataNode.getLastDiskErrorCheck();
+ dataNode.checkDiskError();
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ return dataNode.getLastDiskErrorCheck() > lastCheckTimestamp;
+ }
+ }, 100, 60000);
}
@Test
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f678080d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeChecker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeChecker.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeChecker.java
index fa809d1..50096ba 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeChecker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeChecker.java
@@ -35,7 +35,10 @@ import org.junit.runners.Parameterized.Parameters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@@ -103,8 +106,8 @@ public class TestDatasetVolumeChecker {
*/
checker.checkVolume(volume, new DatasetVolumeChecker.Callback() {
@Override
- public void call(Set<StorageLocation> healthyVolumes,
- Set<StorageLocation> failedVolumes) {
+ public void call(Set<FsVolumeSpi> healthyVolumes,
+ Set<FsVolumeSpi> failedVolumes) {
numCallbackInvocations.incrementAndGet();
if (expectedVolumeHealth != null && expectedVolumeHealth != FAILED) {
assertThat(healthyVolumes.size(), is(1));
@@ -138,7 +141,7 @@ public class TestDatasetVolumeChecker {
new DatasetVolumeChecker(new HdfsConfiguration(), new FakeTimer());
checker.setDelegateChecker(new DummyChecker());
- Set<StorageLocation> failedVolumes = checker.checkAllVolumes(dataset);
+ Set<FsVolumeSpi> failedVolumes = checker.checkAllVolumes(dataset);
LOG.info("Got back {} failed volumes", failedVolumes.size());
if (expectedVolumeHealth == null || expectedVolumeHealth == FAILED) {
@@ -174,8 +177,8 @@ public class TestDatasetVolumeChecker {
dataset, new DatasetVolumeChecker.Callback() {
@Override
public void call(
- Set<StorageLocation> healthyVolumes,
- Set<StorageLocation> failedVolumes) {
+ Set<FsVolumeSpi> healthyVolumes,
+ Set<FsVolumeSpi> failedVolumes) {
LOG.info("Got back {} failed volumes", failedVolumes.size());
if (expectedVolumeHealth == null ||
expectedVolumeHealth == FAILED) {
@@ -236,7 +239,7 @@ public class TestDatasetVolumeChecker {
return dataset;
}
- private static List<FsVolumeSpi> makeVolumes(
+ static List<FsVolumeSpi> makeVolumes(
int numVolumes, VolumeCheckResult health) throws Exception {
final List<FsVolumeSpi> volumes = new ArrayList<>(numVolumes);
for (int i = 0; i < numVolumes; ++i) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f678080d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeCheckerFailures.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeCheckerFailures.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeCheckerFailures.java
index b57d84f..16c333b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeCheckerFailures.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeCheckerFailures.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.*;
import org.apache.hadoop.util.FakeTimer;
+import org.junit.Before;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@@ -47,6 +48,19 @@ public class TestDatasetVolumeCheckerFailures {
public static final Logger LOG =LoggerFactory.getLogger(
TestDatasetVolumeCheckerFailures.class);
+ private FakeTimer timer;
+ private Configuration conf;
+
+ private static final long MIN_DISK_CHECK_GAP_MS = 1000; // 1 second.
+
+ @Before
+ public void commonInit() {
+ timer = new FakeTimer();
+ conf = new HdfsConfiguration();
+ conf.setTimeDuration(DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY,
+ MIN_DISK_CHECK_GAP_MS, TimeUnit.MILLISECONDS);
+ }
+
/**
* Test timeout in {@link DatasetVolumeChecker#checkAllVolumes}.
* @throws Exception
@@ -61,14 +75,13 @@ public class TestDatasetVolumeCheckerFailures {
TestDatasetVolumeChecker.makeDataset(volumes);
// Create a disk checker with a very low timeout.
- final HdfsConfiguration conf = new HdfsConfiguration();
conf.setTimeDuration(DFSConfigKeys.DFS_DATANODE_DISK_CHECK_TIMEOUT_KEY,
1, TimeUnit.SECONDS);
final DatasetVolumeChecker checker =
new DatasetVolumeChecker(conf, new FakeTimer());
// Ensure that the hung volume is detected as failed.
- Set<StorageLocation> failedVolumes = checker.checkAllVolumes(dataset);
+ Set<FsVolumeSpi> failedVolumes = checker.checkAllVolumes(dataset);
assertThat(failedVolumes.size(), is(1));
}
@@ -86,10 +99,10 @@ public class TestDatasetVolumeCheckerFailures {
final FsDatasetSpi<FsVolumeSpi> dataset =
TestDatasetVolumeChecker.makeDataset(volumes);
- DatasetVolumeChecker checker =
- new DatasetVolumeChecker(new HdfsConfiguration(), new FakeTimer());
- Set<StorageLocation> failedVolumes = checker.checkAllVolumes(dataset);
+ DatasetVolumeChecker checker = new DatasetVolumeChecker(conf, timer);
+ Set<FsVolumeSpi> failedVolumes = checker.checkAllVolumes(dataset);
assertThat(failedVolumes.size(), is(0));
+ assertThat(checker.getNumSyncDatasetChecks(), is(0L));
// The closed volume should not have been checked as it cannot
// be referenced.
@@ -98,13 +111,10 @@ public class TestDatasetVolumeCheckerFailures {
@Test(timeout=60000)
public void testMinGapIsEnforcedForSyncChecks() throws Exception {
+ final List<FsVolumeSpi> volumes =
+ TestDatasetVolumeChecker.makeVolumes(1, VolumeCheckResult.HEALTHY);
final FsDatasetSpi<FsVolumeSpi> dataset =
- TestDatasetVolumeChecker.makeDataset(Collections.emptyList());
- final FakeTimer timer = new FakeTimer();
- final Configuration conf = new HdfsConfiguration();
- final long minGapMs = 100;
- conf.setTimeDuration(DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY,
- minGapMs, TimeUnit.MILLISECONDS);
+ TestDatasetVolumeChecker.makeDataset(volumes);
final DatasetVolumeChecker checker = new DatasetVolumeChecker(conf, timer);
checker.checkAllVolumes(dataset);
@@ -116,7 +126,7 @@ public class TestDatasetVolumeCheckerFailures {
assertThat(checker.getNumSkippedChecks(), is(1L));
// Re-check after advancing the timer. Ensure the check is performed.
- timer.advance(minGapMs);
+ timer.advance(MIN_DISK_CHECK_GAP_MS);
checker.checkAllVolumes(dataset);
assertThat(checker.getNumSyncDatasetChecks(), is(2L));
assertThat(checker.getNumSkippedChecks(), is(1L));
@@ -124,13 +134,10 @@ public class TestDatasetVolumeCheckerFailures {
@Test(timeout=60000)
public void testMinGapIsEnforcedForASyncChecks() throws Exception {
+ final List<FsVolumeSpi> volumes =
+ TestDatasetVolumeChecker.makeVolumes(1, VolumeCheckResult.HEALTHY);
final FsDatasetSpi<FsVolumeSpi> dataset =
- TestDatasetVolumeChecker.makeDataset(Collections.emptyList());
- final FakeTimer timer = new FakeTimer();
- final Configuration conf = new HdfsConfiguration();
- final long minGapMs = 100;
- conf.setTimeDuration(DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY,
- minGapMs, TimeUnit.MILLISECONDS);
+ TestDatasetVolumeChecker.makeDataset(volumes);
final DatasetVolumeChecker checker = new DatasetVolumeChecker(conf, timer);
checker.checkAllVolumesAsync(dataset, null);
@@ -142,7 +149,7 @@ public class TestDatasetVolumeCheckerFailures {
assertThat(checker.getNumSkippedChecks(), is(1L));
// Re-check after advancing the timer. Ensure the check is performed.
- timer.advance(minGapMs);
+ timer.advance(MIN_DISK_CHECK_GAP_MS);
checker.checkAllVolumesAsync(dataset, null);
assertThat(checker.getNumAsyncDatasetChecks(), is(2L));
assertThat(checker.getNumSkippedChecks(), is(1L));
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f678080d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
index 5cd86e2..62ef731 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
@@ -239,8 +239,7 @@ public class ExternalDatasetImpl implements FsDatasetSpi<ExternalVolumeImpl> {
}
@Override
- public Set<StorageLocation> checkDataDir() {
- return null;
+ public void handleVolumeFailures(Set<FsVolumeSpi> failedVolumes) {
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f678080d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
index e48aae0..905c3f0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
@@ -52,13 +52,10 @@ import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi.FsVolumeReferences;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.io.MultipleIOException;
import org.apache.hadoop.test.GenericTestUtils;
-import org.apache.hadoop.util.DiskChecker;
import org.apache.hadoop.util.FakeTimer;
import org.apache.hadoop.util.StringUtils;
import org.junit.Assert;
@@ -66,8 +63,6 @@ import org.junit.Before;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
import java.io.File;
import java.io.FileOutputStream;
@@ -76,16 +71,18 @@ import java.io.OutputStreamWriter;
import java.io.Writer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DN_CACHED_DFSUSED_CHECK_INTERVAL_MS;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY;
+import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
@@ -94,13 +91,10 @@ import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyListOf;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import org.slf4j.Logger;
@@ -339,68 +333,6 @@ public class TestFsDatasetImpl {
assertEquals(numExistingVolumes, getNumVolumes());
}
- @Test(timeout = 5000)
- public void testChangeVolumeWithRunningCheckDirs() throws IOException {
- RoundRobinVolumeChoosingPolicy<FsVolumeImpl> blockChooser =
- new RoundRobinVolumeChoosingPolicy<>();
- conf.setLong(DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, -1);
- final BlockScanner blockScanner = new BlockScanner(datanode);
- final FsVolumeList volumeList = new FsVolumeList(
- Collections.<VolumeFailureInfo>emptyList(), blockScanner, blockChooser);
- final List<FsVolumeImpl> oldVolumes = new ArrayList<>();
-
- // Initialize FsVolumeList with 5 mock volumes.
- final int NUM_VOLUMES = 5;
- for (int i = 0; i < NUM_VOLUMES; i++) {
- FsVolumeImpl volume = mock(FsVolumeImpl.class);
- oldVolumes.add(volume);
- when(volume.getStorageLocation()).thenReturn(
- StorageLocation.parse(new File("data" + i).toURI().toString()));
- when(volume.checkClosed()).thenReturn(true);
- FsVolumeReference ref = mock(FsVolumeReference.class);
- when(ref.getVolume()).thenReturn(volume);
- volumeList.addVolume(ref);
- }
-
- // When call checkDirs() on the 2nd volume, anther "thread" removes the 5th
- // volume and add another volume. It does not affect checkDirs() running.
- final FsVolumeImpl newVolume = mock(FsVolumeImpl.class);
- final FsVolumeReference newRef = mock(FsVolumeReference.class);
- when(newRef.getVolume()).thenReturn(newVolume);
- when(newVolume.getStorageLocation()).thenReturn(
- StorageLocation.parse(new File("data4").toURI().toString()));
- FsVolumeImpl blockedVolume = volumeList.getVolumes().get(1);
- doAnswer(new Answer() {
- @Override
- public Object answer(InvocationOnMock invocationOnMock)
- throws Throwable {
- volumeList.removeVolume(
- StorageLocation.parse((new File("data4")).toURI().toString()),
- false);
- volumeList.addVolume(newRef);
- return null;
- }
- }).when(blockedVolume).checkDirs();
-
- FsVolumeImpl brokenVolume = volumeList.getVolumes().get(2);
- doThrow(new DiskChecker.DiskErrorException("broken"))
- .when(brokenVolume).checkDirs();
-
- volumeList.checkDirs();
-
- // Since FsVolumeImpl#checkDirs() get a snapshot of the list of volumes
- // before running removeVolume(), it is supposed to run checkDirs() on all
- // the old volumes.
- for (FsVolumeImpl volume : oldVolumes) {
- verify(volume).checkDirs();
- }
- // New volume is not visible to checkDirs() process.
- verify(newVolume, never()).checkDirs();
- assertTrue(volumeList.getVolumes().contains(newVolume));
- assertFalse(volumeList.getVolumes().contains(brokenVolume));
- assertEquals(NUM_VOLUMES - 1, volumeList.getVolumes().size());
- }
-
@Test
public void testAddVolumeFailureReleasesInUseLock() throws IOException {
FsDatasetImpl spyDataset = spy(dataset);
@@ -717,6 +649,9 @@ public class TestFsDatasetImpl {
Configuration config = new HdfsConfiguration();
config.setLong(
DFSConfigKeys.DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_KEY, 1000);
+ config.setTimeDuration(
+ DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY, 0,
+ TimeUnit.MILLISECONDS);
config.setInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 1);
cluster = new MiniDFSCluster.Builder(config).numDataNodes(1).build();
@@ -734,6 +669,8 @@ public class TestFsDatasetImpl {
getVolume(block);
File finalizedDir = volume.getFinalizedDir(cluster.getNamesystem()
.getBlockPoolId());
+ LocatedBlock lb = DFSTestUtil.getAllBlocks(fs, filePath).get(0);
+ DatanodeInfo info = lb.getLocations()[0];
if (finalizedDir.exists()) {
// Remove write and execute access so that checkDiskErrorThread detects
@@ -744,15 +681,14 @@ public class TestFsDatasetImpl {
Assert.assertTrue("Reference count for the volume should be greater "
+ "than 0", volume.getReferenceCount() > 0);
// Invoke the synchronous checkDiskError method
- dataNode.getFSDataset().checkDataDir();
+ dataNode.checkDiskError();
// Sleep for 1 second so that datanode can interrupt and cluster clean up
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override public Boolean get() {
return volume.getReferenceCount() == 0;
}
}, 100, 10);
- LocatedBlock lb = DFSTestUtil.getAllBlocks(fs, filePath).get(0);
- DatanodeInfo info = lb.getLocations()[0];
+ assertThat(dataNode.getFSDataset().getNumFailedVolumes(), is(1));
try {
out.close();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f678080d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java
index 6eff300..83c15ca 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java
@@ -109,43 +109,6 @@ public class TestFsVolumeList {
}
@Test(timeout=30000)
- public void testCheckDirsWithClosedVolume() throws IOException {
- FsVolumeList volumeList = new FsVolumeList(
- Collections.<VolumeFailureInfo>emptyList(), blockScanner, blockChooser);
- final List<FsVolumeImpl> volumes = new ArrayList<>();
- for (int i = 0; i < 3; i++) {
- File curDir = new File(baseDir, "volume-" + i);
- curDir.mkdirs();
- FsVolumeImpl volume = new FsVolumeImplBuilder()
- .setConf(conf)
- .setDataset(dataset)
- .setStorageID("storage-id")
- .setStorageDirectory(
- new StorageDirectory(StorageLocation.parse(curDir.getPath())))
- .build();
- volumes.add(volume);
- volumeList.addVolume(volume.obtainReference());
- }
-
- // Close the 2nd volume.
- volumes.get(1).setClosed();
- try {
- GenericTestUtils.waitFor(new Supplier<Boolean>() {
- @Override
- public Boolean get() {
- return volumes.get(1).checkClosed();
- }
- }, 100, 3000);
- } catch (TimeoutException e) {
- fail("timed out while waiting for volume to be removed.");
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- }
- // checkDirs() should ignore the 2nd volume since it is closed.
- volumeList.checkDirs();
- }
-
- @Test(timeout=30000)
public void testReleaseVolumeRefIfNoBlockScanner() throws IOException {
FsVolumeList volumeList = new FsVolumeList(
Collections.<VolumeFailureInfo>emptyList(), null, blockChooser);
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org