You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by xg...@apache.org on 2016/12/05 18:47:20 UTC
[02/29] hadoop git commit: HDFS-11149. Support for parallel checking
of FsVolumes.
HDFS-11149. Support for parallel checking of FsVolumes.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/eaaa3295
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/eaaa3295
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/eaaa3295
Branch: refs/heads/YARN-5734
Commit: eaaa32950cbae42a74e28e3db3f0cdb1ff158119
Parents: 8f6e143
Author: Arpit Agarwal <ar...@apache.org>
Authored: Tue Nov 29 20:31:02 2016 -0800
Committer: Arpit Agarwal <ar...@apache.org>
Committed: Tue Nov 29 20:31:02 2016 -0800
----------------------------------------------------------------------
.../datanode/checker/DatasetVolumeChecker.java | 442 +++++++++++++++++++
.../server/datanode/fsdataset/FsDatasetSpi.java | 7 +
.../server/datanode/fsdataset/FsVolumeSpi.java | 12 +-
.../datanode/fsdataset/impl/FsVolumeImpl.java | 15 +-
.../src/main/resources/hdfs-default.xml | 10 +-
.../server/datanode/SimulatedFSDataset.java | 7 +
.../server/datanode/TestDirectoryScanner.java | 7 +
.../checker/TestDatasetVolumeChecker.java | 261 +++++++++++
.../TestDatasetVolumeCheckerFailures.java | 193 ++++++++
.../datanode/extdataset/ExternalVolumeImpl.java | 7 +
10 files changed, 953 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/eaaa3295/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/DatasetVolumeChecker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/DatasetVolumeChecker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/DatasetVolumeChecker.java
new file mode 100644
index 0000000..8a57812
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/DatasetVolumeChecker.java
@@ -0,0 +1,442 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode.checker;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi.VolumeCheckContext;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.util.DiskChecker.DiskErrorException;
+import org.apache.hadoop.util.Timer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.nio.channels.ClosedChannelException;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DISK_CHECK_TIMEOUT_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DISK_CHECK_TIMEOUT_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY;
+
+/**
+ * A class that encapsulates running disk checks against each volume of an
+ * {@link FsDatasetSpi} and allows retrieving a list of failed volumes.
+ *
+ * This splits out behavior that was originally implemented across
+ * DataNode, FsDatasetImpl and FsVolumeList.
+ */
+public class DatasetVolumeChecker {
+
+ public static final Logger LOG =
+ LoggerFactory.getLogger(DatasetVolumeChecker.class);
+
+ private AsyncChecker<VolumeCheckContext, VolumeCheckResult> delegateChecker;
+
+ private final AtomicLong numVolumeChecks = new AtomicLong(0);
+ private final AtomicLong numSyncDatasetChecks = new AtomicLong(0);
+ private final AtomicLong numAsyncDatasetChecks = new AtomicLong(0);
+ private final AtomicLong numSkippedChecks = new AtomicLong(0);
+
+ /**
+ * Max allowed time for a disk check in milliseconds. If the check
+ * doesn't complete within this time we declare the disk as dead.
+ */
+ private final long maxAllowedTimeForCheckMs;
+
+ /**
+ * Maximum number of volume failures that can be tolerated without
+ * declaring a fatal error.
+ */
+ private final int maxVolumeFailuresTolerated;
+
+ /**
+ * Minimum time between two successive disk checks of a volume.
+ */
+ private final long minDiskCheckGapMs;
+
+ /**
+ * Timestamp of the last check of all volumes.
+ */
+ private long lastAllVolumesCheck;
+
+ private final Timer timer;
+
+ private static final VolumeCheckContext IGNORED_CONTEXT =
+ new VolumeCheckContext();
+
+ /**
+ * @param conf Configuration object.
+ * @param timer {@link Timer} object used for throttling checks.
+ */
+ public DatasetVolumeChecker(Configuration conf, Timer timer)
+ throws DiskErrorException {
+ maxAllowedTimeForCheckMs = conf.getTimeDuration(
+ DFS_DATANODE_DISK_CHECK_TIMEOUT_KEY,
+ DFS_DATANODE_DISK_CHECK_TIMEOUT_DEFAULT,
+ TimeUnit.MILLISECONDS);
+
+ if (maxAllowedTimeForCheckMs <= 0) {
+ throw new DiskErrorException("Invalid value configured for "
+ + DFS_DATANODE_DISK_CHECK_TIMEOUT_KEY + " - "
+ + maxAllowedTimeForCheckMs + " (should be > 0)");
+ }
+
+ this.timer = timer;
+
+ maxVolumeFailuresTolerated = conf.getInt(
+ DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY,
+ DFS_DATANODE_FAILED_VOLUMES_TOLERATED_DEFAULT);
+
+ minDiskCheckGapMs = conf.getTimeDuration(
+ DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY,
+ DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_DEFAULT,
+ TimeUnit.MILLISECONDS);
+
+ if (minDiskCheckGapMs < 0) {
+ throw new DiskErrorException("Invalid value configured for "
+ + DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY + " - "
+ + minDiskCheckGapMs + " (should be >= 0)");
+ }
+
+ lastAllVolumesCheck = timer.monotonicNow() - minDiskCheckGapMs;
+
+ if (maxVolumeFailuresTolerated < 0) {
+ throw new DiskErrorException("Invalid value configured for "
+ + DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY + " - "
+ + maxVolumeFailuresTolerated + " (should be non-negative)");
+ }
+
+ delegateChecker = new ThrottledAsyncChecker<>(
+ timer, minDiskCheckGapMs, Executors.newCachedThreadPool(
+ new ThreadFactoryBuilder()
+ .setNameFormat("DataNode DiskChecker thread %d")
+ .setDaemon(true)
+ .build()));
+ }
+
+ /**
+ * Run checks against all volumes of a dataset.
+ *
+ * This check may be performed at service startup and subsequently at
+ * regular intervals to detect and handle failed volumes.
+ *
+ * @param dataset - FsDatasetSpi to be checked.
+ * @return set of failed volumes.
+ */
+ public Set<StorageLocation> checkAllVolumes(
+ final FsDatasetSpi<? extends FsVolumeSpi> dataset)
+ throws InterruptedException {
+
+ if (timer.monotonicNow() - lastAllVolumesCheck < minDiskCheckGapMs) {
+ numSkippedChecks.incrementAndGet();
+ return Collections.emptySet();
+ }
+
+ lastAllVolumesCheck = timer.monotonicNow();
+ final Set<StorageLocation> healthyVolumes = new HashSet<>();
+ final Set<StorageLocation> failedVolumes = new HashSet<>();
+ final Set<StorageLocation> allVolumes = new HashSet<>();
+
+ final FsDatasetSpi.FsVolumeReferences references =
+ dataset.getFsVolumeReferences();
+ final CountDownLatch resultsLatch = new CountDownLatch(references.size());
+
+ for (int i = 0; i < references.size(); ++i) {
+ final FsVolumeReference reference = references.getReference(i);
+ allVolumes.add(reference.getVolume().getStorageLocation());
+ ListenableFuture<VolumeCheckResult> future =
+ delegateChecker.schedule(reference.getVolume(), IGNORED_CONTEXT);
+ LOG.info("Scheduled health check for volume {}", reference.getVolume());
+ Futures.addCallback(future, new ResultHandler(
+ reference, healthyVolumes, failedVolumes, resultsLatch, null));
+ }
+
+ // Wait until our timeout elapses, after which we give up on
+ // the remaining volumes.
+ if (!resultsLatch.await(maxAllowedTimeForCheckMs, TimeUnit.MILLISECONDS)) {
+ LOG.warn("checkAllVolumes timed out after {} ms" +
+ maxAllowedTimeForCheckMs);
+ }
+
+ numSyncDatasetChecks.incrementAndGet();
+ synchronized (this) {
+ // All volumes that have not been detected as healthy should be
+ // considered failed. This is a superset of 'failedVolumes'.
+ //
+ // Make a copy under the mutex as Sets.difference() returns a view
+ // of a potentially changing set.
+ return new HashSet<>(Sets.difference(allVolumes, healthyVolumes));
+ }
+ }
+
+ /**
+ * Start checks against all volumes of a dataset, invoking the
+ * given callback when the operation has completed. The function
+ * does not wait for the checks to complete.
+ *
+ * If a volume cannot be referenced then it is already closed and
+ * cannot be checked. No error is propagated to the callback for that
+ * volume.
+ *
+ * @param dataset - FsDatasetSpi to be checked.
+ * @param callback - Callback to be invoked when the checks are complete.
+ * @return true if the check was scheduled and the callback will be invoked.
+ * false if the check was not scheduled and the callback will not be
+ * invoked.
+ */
+ public boolean checkAllVolumesAsync(
+ final FsDatasetSpi<? extends FsVolumeSpi> dataset,
+ Callback callback) {
+
+ if (timer.monotonicNow() - lastAllVolumesCheck < minDiskCheckGapMs) {
+ numSkippedChecks.incrementAndGet();
+ return false;
+ }
+
+ lastAllVolumesCheck = timer.monotonicNow();
+ final Set<StorageLocation> healthyVolumes = new HashSet<>();
+ final Set<StorageLocation> failedVolumes = new HashSet<>();
+ final FsDatasetSpi.FsVolumeReferences references =
+ dataset.getFsVolumeReferences();
+ final CountDownLatch latch = new CountDownLatch(references.size());
+
+ LOG.info("Checking {} volumes", references.size());
+ for (int i = 0; i < references.size(); ++i) {
+ final FsVolumeReference reference = references.getReference(i);
+ // The context parameter is currently ignored.
+ ListenableFuture<VolumeCheckResult> future =
+ delegateChecker.schedule(reference.getVolume(), IGNORED_CONTEXT);
+ Futures.addCallback(future, new ResultHandler(
+ reference, healthyVolumes, failedVolumes, latch, callback));
+ }
+ numAsyncDatasetChecks.incrementAndGet();
+ return true;
+ }
+
+ /**
+ * A callback interface that is supplied the result of running an
+ * async disk check on multiple volumes.
+ */
+ public interface Callback {
+ /**
+ * @param healthyVolumes set of volumes that passed disk checks.
+ * @param failedVolumes set of volumes that failed disk checks.
+ */
+ void call(Set<StorageLocation> healthyVolumes,
+ Set<StorageLocation> failedVolumes);
+ }
+
+ /**
+ * Check a single volume, returning a {@link ListenableFuture}
+ * that can be used to retrieve the final result.
+ *
+ * If the volume cannot be referenced then it is already closed and
+ * cannot be checked. No error is propagated to the callback.
+ *
+ * @param volume the volume that is to be checked.
+ * @param callback callback to be invoked when the volume check completes.
+ */
+ public void checkVolume(
+ final FsVolumeSpi volume,
+ Callback callback) {
+ FsVolumeReference volumeReference;
+ try {
+ volumeReference = volume.obtainReference();
+ } catch (ClosedChannelException e) {
+ // The volume has already been closed.
+ callback.call(new HashSet<>(), new HashSet<>());
+ return;
+ }
+ ListenableFuture<VolumeCheckResult> future =
+ delegateChecker.schedule(volume, IGNORED_CONTEXT);
+ numVolumeChecks.incrementAndGet();
+ Futures.addCallback(future, new ResultHandler(
+ volumeReference, new HashSet<>(), new HashSet<>(),
+ new CountDownLatch(1), callback));
+ }
+
+ /**
+ * A callback to process the results of checking a volume.
+ */
+ private class ResultHandler
+ implements FutureCallback<VolumeCheckResult> {
+ private final FsVolumeReference reference;
+ private final Set<StorageLocation> failedVolumes;
+ private final Set<StorageLocation> healthyVolumes;
+ private final CountDownLatch latch;
+ private final AtomicLong numVolumes;
+
+ @Nullable
+ private final Callback callback;
+
+ ResultHandler(FsVolumeReference reference,
+ Set<StorageLocation> healthyVolumes,
+ Set<StorageLocation> failedVolumes,
+ CountDownLatch latch,
+ @Nullable Callback callback) {
+ Preconditions.checkState(reference != null);
+ this.reference = reference;
+ this.healthyVolumes = healthyVolumes;
+ this.failedVolumes = failedVolumes;
+ this.latch = latch;
+ this.callback = callback;
+ numVolumes = new AtomicLong(latch.getCount());
+ }
+
+ @Override
+ public void onSuccess(@Nonnull VolumeCheckResult result) {
+ switch(result) {
+ case HEALTHY:
+ case DEGRADED:
+ LOG.debug("Volume {} is {}.", reference.getVolume(), result);
+ markHealthy();
+ break;
+ case FAILED:
+ LOG.warn("Volume {} detected as being unhealthy",
+ reference.getVolume());
+ markFailed();
+ break;
+ default:
+ LOG.error("Unexpected health check result {} for volume {}",
+ result, reference.getVolume());
+ markHealthy();
+ break;
+ }
+ cleanup();
+ }
+
+ @Override
+ public void onFailure(@Nonnull Throwable t) {
+ Throwable exception = (t instanceof ExecutionException) ?
+ t.getCause() : t;
+ LOG.warn("Exception running disk checks against volume " +
+ reference.getVolume(), exception);
+ markFailed();
+ cleanup();
+ }
+
+ private void markHealthy() {
+ synchronized (DatasetVolumeChecker.this) {
+ healthyVolumes.add(reference.getVolume().getStorageLocation());
+ }
+ }
+
+ private void markFailed() {
+ synchronized (DatasetVolumeChecker.this) {
+ failedVolumes.add(reference.getVolume().getStorageLocation());
+ }
+ }
+
+ private void cleanup() {
+ IOUtils.cleanup(null, reference);
+ invokeCallback();
+ }
+
+ private void invokeCallback() {
+ try {
+ latch.countDown();
+
+ if (numVolumes.decrementAndGet() == 0 &&
+ callback != null) {
+ callback.call(healthyVolumes, failedVolumes);
+ }
+ } catch(Exception e) {
+ // Propagating this exception is unlikely to be helpful.
+ LOG.warn("Unexpected exception", e);
+ }
+ }
+ }
+
+ /**
+ * Shutdown the checker and its associated ExecutorService.
+ *
+ * See {@link ExecutorService#awaitTermination} for the interpretation
+ * of the parameters.
+ */
+ public void shutdownAndWait(int gracePeriod, TimeUnit timeUnit) {
+ try {
+ delegateChecker.shutdownAndWait(gracePeriod, timeUnit);
+ } catch (InterruptedException e) {
+ LOG.warn("DatasetVolumeChecker interrupted during shutdown.");
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ /**
+ * This method is for testing only.
+ *
+ * @param testDelegate
+ */
+ @VisibleForTesting
+ void setDelegateChecker(
+ AsyncChecker<VolumeCheckContext, VolumeCheckResult> testDelegate) {
+ delegateChecker = testDelegate;
+ }
+
+ /**
+ * Return the number of {@link #checkVolume} invocations.
+ */
+ public long getNumVolumeChecks() {
+ return numVolumeChecks.get();
+ }
+
+ /**
+ * Return the number of {@link #checkAllVolumes} invocations.
+ */
+ public long getNumSyncDatasetChecks() {
+ return numSyncDatasetChecks.get();
+ }
+
+ /**
+ * Return the number of {@link #checkAllVolumesAsync} invocations.
+ */
+ public long getNumAsyncDatasetChecks() {
+ return numAsyncDatasetChecks.get();
+ }
+
+ /**
+ * Return the number of checks skipped because the minimum gap since the
+ * last check had not elapsed.
+ */
+ public long getNumSkippedChecks() {
+ return numSkippedChecks.get();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/eaaa3295/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
index 547392f..57ec2b4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
@@ -159,6 +159,13 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
return references.get(index).getVolume();
}
+ /**
+ * Get the reference for a given index.
+ */
+ public FsVolumeReference getReference(int index) {
+ return references.get(index);
+ }
+
@Override
public void close() throws IOException {
IOException ioe = null;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/eaaa3295/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
index dbba31d..a11a207 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
@@ -34,11 +34,15 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner.ReportCompiler;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
+import org.apache.hadoop.hdfs.server.datanode.checker.Checkable;
+import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult;
/**
* This is an interface for the underlying volume.
*/
-public interface FsVolumeSpi {
+public interface FsVolumeSpi
+ extends Checkable<FsVolumeSpi.VolumeCheckContext, VolumeCheckResult> {
+
/**
* Obtain a reference object that had increased 1 reference count of the
* volume.
@@ -408,4 +412,10 @@ public interface FsVolumeSpi {
LinkedList<ScanInfo> compileReport(String bpid,
LinkedList<ScanInfo> report, ReportCompiler reportCompiler)
throws InterruptedException, IOException;
+
+ /**
+ * Context for the {@link #check} call.
+ */
+ class VolumeCheckContext {
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/eaaa3295/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
index 5880b3e..a231e03 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
@@ -49,6 +49,8 @@ import org.apache.hadoop.fs.DF;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
+import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtilClient;
@@ -69,7 +71,6 @@ import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipeline;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner.BlockDirFilter;
import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner.ReportCompiler;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaTracker.RamDiskReplica;
@@ -914,7 +915,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
}
@Override
- public FsDatasetSpi getDataset() {
+ public FsDatasetSpi<? extends FsVolumeSpi> getDataset() {
return dataset;
}
@@ -962,6 +963,16 @@ public class FsVolumeImpl implements FsVolumeSpi {
s.checkDirs();
}
}
+
+ @Override
+ public VolumeCheckResult check(VolumeCheckContext ignored)
+ throws DiskErrorException {
+ // TODO:FEDERATION valid synchronization
+ for(BlockPoolSlice s : bpSlices.values()) {
+ s.checkDirs();
+ }
+ return VolumeCheckResult.HEALTHY;
+ }
void getVolumeMap(ReplicaMap volumeMap,
final RamDiskReplicaTracker ramDiskReplicaMap)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/eaaa3295/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index c9d74bb..671c98c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -4376,11 +4376,11 @@
<name>dfs.datanode.disk.check.timeout</name>
<value>10m</value>
<description>
- Maximum allowed time for a disk check to complete. If the check does
- not complete within this time interval then the disk is declared as
- failed. This setting supports multiple time unit suffixes as described
- in dfs.heartbeat.interval. If no suffix is specified then milliseconds
- is assumed.
+ Maximum allowed time for a disk check to complete during DataNode
+ startup. If the check does not complete within this time interval
+ then the disk is declared as failed. This setting supports
+ multiple time unit suffixes as described in dfs.heartbeat.interval.
+ If no suffix is specified then milliseconds is assumed.
</description>
</property>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/eaaa3295/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
index 8e6191a..5d63d07 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
@@ -42,6 +42,7 @@ import org.apache.commons.lang.ArrayUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.DF;
import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.Block;
@@ -557,6 +558,12 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
throws InterruptedException, IOException {
return null;
}
+
+ @Override
+ public VolumeCheckResult check(VolumeCheckContext context)
+ throws Exception {
+ return VolumeCheckResult.HEALTHY;
+ }
}
private final Map<String, Map<Block, BInfo>> blockMap
http://git-wip-us.apache.org/repos/asf/hadoop/blob/eaaa3295/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
index d05e2a7..f08b579 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
@@ -54,6 +54,7 @@ import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
@@ -897,6 +898,12 @@ public class TestDirectoryScanner {
return null;
}
+
+ @Override
+ public VolumeCheckResult check(VolumeCheckContext context)
+ throws Exception {
+ return VolumeCheckResult.HEALTHY;
+ }
}
private final static TestFsVolumeSpi TEST_VOLUME = new TestFsVolumeSpi();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/eaaa3295/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeChecker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeChecker.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeChecker.java
new file mode 100644
index 0000000..fa809d1
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeChecker.java
@@ -0,0 +1,261 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode.checker;
+
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.*;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi.VolumeCheckContext;
+import org.apache.hadoop.util.DiskChecker.DiskErrorException;
+import org.apache.hadoop.util.FakeTimer;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult.*;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+
+/**
+ * Tests for {@link DatasetVolumeChecker} when the {@link FsVolumeSpi#check}
+ * method returns different values of {@link VolumeCheckResult}.
+ */
+@RunWith(Parameterized.class)
+public class TestDatasetVolumeChecker {
+ public static final Logger LOG =
+ LoggerFactory.getLogger(TestDatasetVolumeChecker.class);
+
+ @Rule
+ public TestName testName = new TestName();
+
+ /**
+ * Run each test case for each possible value of {@link VolumeCheckResult}.
+ * Including "null" for 'throw exception'.
+ * @return
+ */
+ @Parameters(name="{0}")
+ public static Collection<Object[]> data() {
+ List<Object[]> values = new ArrayList<>();
+ for (VolumeCheckResult result : VolumeCheckResult.values()) {
+ values.add(new Object[] {result});
+ }
+ values.add(new Object[] {null});
+ return values;
+ }
+
+ /**
+ * When null, the check call should throw an exception.
+ */
+ private final VolumeCheckResult expectedVolumeHealth;
+ private static final int NUM_VOLUMES = 2;
+
+
+ public TestDatasetVolumeChecker(VolumeCheckResult expectedVolumeHealth) {
+ this.expectedVolumeHealth = expectedVolumeHealth;
+ }
+
+ /**
+ * Test {@link DatasetVolumeChecker#checkVolume} propagates the
+ * check to the delegate checker.
+ *
+ * @throws Exception
+ */
+ @Test(timeout = 10000)
+ public void testCheckOneVolume() throws Exception {
+ LOG.info("Executing {}", testName.getMethodName());
+ final FsVolumeSpi volume = makeVolumes(1, expectedVolumeHealth).get(0);
+ final DatasetVolumeChecker checker =
+ new DatasetVolumeChecker(new HdfsConfiguration(), new FakeTimer());
+ checker.setDelegateChecker(new DummyChecker());
+ final AtomicLong numCallbackInvocations = new AtomicLong(0);
+
+ /**
+ * Request a check and ensure it triggered {@link FsVolumeSpi#check}.
+ */
+ checker.checkVolume(volume, new DatasetVolumeChecker.Callback() {
+ @Override
+ public void call(Set<StorageLocation> healthyVolumes,
+ Set<StorageLocation> failedVolumes) {
+ numCallbackInvocations.incrementAndGet();
+ if (expectedVolumeHealth != null && expectedVolumeHealth != FAILED) {
+ assertThat(healthyVolumes.size(), is(1));
+ assertThat(failedVolumes.size(), is(0));
+ } else {
+ assertThat(healthyVolumes.size(), is(0));
+ assertThat(failedVolumes.size(), is(1));
+ }
+ }
+ });
+
+ // Ensure that the check was invoked at least once.
+ verify(volume, times(1)).check(anyObject());
+ assertThat(numCallbackInvocations.get(), is(1L));
+ }
+
+ /**
+ * Test {@link DatasetVolumeChecker#checkAllVolumes} propagates
+ * checks for all volumes to the delegate checker.
+ *
+ * @throws Exception
+ */
+ @Test(timeout = 10000)
+ public void testCheckAllVolumes() throws Exception {
+ LOG.info("Executing {}", testName.getMethodName());
+
+ final List<FsVolumeSpi> volumes = makeVolumes(
+ NUM_VOLUMES, expectedVolumeHealth);
+ final FsDatasetSpi<FsVolumeSpi> dataset = makeDataset(volumes);
+ final DatasetVolumeChecker checker =
+ new DatasetVolumeChecker(new HdfsConfiguration(), new FakeTimer());
+ checker.setDelegateChecker(new DummyChecker());
+
+ Set<StorageLocation> failedVolumes = checker.checkAllVolumes(dataset);
+ LOG.info("Got back {} failed volumes", failedVolumes.size());
+
+ if (expectedVolumeHealth == null || expectedVolumeHealth == FAILED) {
+ assertThat(failedVolumes.size(), is(NUM_VOLUMES));
+ } else {
+ assertTrue(failedVolumes.isEmpty());
+ }
+
+ // Ensure each volume's check() method was called exactly once.
+ for (FsVolumeSpi volume : volumes) {
+ verify(volume, times(1)).check(anyObject());
+ }
+ }
+
+ /**
+ * Unit test for {@link DatasetVolumeChecker#checkAllVolumesAsync}.
+ *
+ * @throws Exception
+ */
+ @Test(timeout=10000)
+ public void testCheckAllVolumesAsync() throws Exception {
+ LOG.info("Executing {}", testName.getMethodName());
+
+ final List<FsVolumeSpi> volumes = makeVolumes(
+ NUM_VOLUMES, expectedVolumeHealth);
+ final FsDatasetSpi<FsVolumeSpi> dataset = makeDataset(volumes);
+ final DatasetVolumeChecker checker =
+ new DatasetVolumeChecker(new HdfsConfiguration(), new FakeTimer());
+ checker.setDelegateChecker(new DummyChecker());
+ final AtomicLong numCallbackInvocations = new AtomicLong(0);
+
+ checker.checkAllVolumesAsync(
+ dataset, new DatasetVolumeChecker.Callback() {
+ @Override
+ public void call(
+ Set<StorageLocation> healthyVolumes,
+ Set<StorageLocation> failedVolumes) {
+ LOG.info("Got back {} failed volumes", failedVolumes.size());
+ if (expectedVolumeHealth == null ||
+ expectedVolumeHealth == FAILED) {
+ assertThat(healthyVolumes.size(), is(0));
+ assertThat(failedVolumes.size(), is(NUM_VOLUMES));
+ } else {
+ assertThat(healthyVolumes.size(), is(NUM_VOLUMES));
+ assertThat(failedVolumes.size(), is(0));
+ }
+ numCallbackInvocations.incrementAndGet();
+ }
+ });
+
+ // The callback should be invoked exactly once.
+ assertThat(numCallbackInvocations.get(), is(1L));
+
+ // Ensure each volume's check() method was called exactly once.
+ for (FsVolumeSpi volume : volumes) {
+ verify(volume, times(1)).check(anyObject());
+ }
+ }
+
+ /**
+ * A checker to wraps the result of {@link FsVolumeSpi#check} in
+ * an ImmediateFuture.
+ */
+ static class DummyChecker
+ implements AsyncChecker<VolumeCheckContext, VolumeCheckResult> {
+ @Override
+ public ListenableFuture<VolumeCheckResult> schedule(
+ Checkable<VolumeCheckContext, VolumeCheckResult> target,
+ VolumeCheckContext context) {
+ try {
+ return Futures.immediateFuture(target.check(context));
+ } catch (Exception e) {
+ LOG.info("check routine threw exception " + e);
+ return Futures.immediateFailedFuture(e);
+ }
+ }
+
+ @Override
+ public void shutdownAndWait(long timeout, TimeUnit timeUnit)
+ throws InterruptedException {
+ // Nothing to cancel.
+ }
+ }
+
+ /**
+ * Create a dataset with the given volumes.
+ */
+ static FsDatasetSpi<FsVolumeSpi> makeDataset(List<FsVolumeSpi> volumes)
+ throws Exception {
+ // Create dataset and init volume health.
+ final FsDatasetSpi<FsVolumeSpi> dataset = mock(FsDatasetSpi.class);
+ final FsDatasetSpi.FsVolumeReferences references = new
+ FsDatasetSpi.FsVolumeReferences(volumes);
+ when(dataset.getFsVolumeReferences()).thenReturn(references);
+ return dataset;
+ }
+
+ private static List<FsVolumeSpi> makeVolumes(
+ int numVolumes, VolumeCheckResult health) throws Exception {
+ final List<FsVolumeSpi> volumes = new ArrayList<>(numVolumes);
+ for (int i = 0; i < numVolumes; ++i) {
+ final FsVolumeSpi volume = mock(FsVolumeSpi.class);
+ final FsVolumeReference reference = mock(FsVolumeReference.class);
+ final StorageLocation location = mock(StorageLocation.class);
+
+ when(reference.getVolume()).thenReturn(volume);
+ when(volume.obtainReference()).thenReturn(reference);
+ when(volume.getStorageLocation()).thenReturn(location);
+
+ if (health != null) {
+ when(volume.check(anyObject())).thenReturn(health);
+ } else {
+ final DiskErrorException de = new DiskErrorException("Fake Exception");
+ when(volume.check(anyObject())).thenThrow(de);
+ }
+ volumes.add(volume);
+ }
+ return volumes;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/eaaa3295/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeCheckerFailures.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeCheckerFailures.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeCheckerFailures.java
new file mode 100644
index 0000000..b57d84f
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeCheckerFailures.java
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode.checker;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.*;
+import org.apache.hadoop.util.FakeTimer;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.channels.ClosedChannelException;
+import java.util.concurrent.TimeUnit;
+import java.util.*;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.*;
+
+
+/**
+ * Test a few more conditions not covered by TestDatasetVolumeChecker.
+ */
+public class TestDatasetVolumeCheckerFailures {
+ public static final Logger LOG =LoggerFactory.getLogger(
+ TestDatasetVolumeCheckerFailures.class);
+
+ /**
+ * Test timeout in {@link DatasetVolumeChecker#checkAllVolumes}.
+ * @throws Exception
+ */
+ @Test(timeout=60000)
+ public void testTimeout() throws Exception {
+ // Add a volume whose check routine hangs forever.
+ final List<FsVolumeSpi> volumes =
+ Collections.singletonList(makeHungVolume());
+
+ final FsDatasetSpi<FsVolumeSpi> dataset =
+ TestDatasetVolumeChecker.makeDataset(volumes);
+
+ // Create a disk checker with a very low timeout.
+ final HdfsConfiguration conf = new HdfsConfiguration();
+ conf.setTimeDuration(DFSConfigKeys.DFS_DATANODE_DISK_CHECK_TIMEOUT_KEY,
+ 1, TimeUnit.SECONDS);
+ final DatasetVolumeChecker checker =
+ new DatasetVolumeChecker(conf, new FakeTimer());
+
+ // Ensure that the hung volume is detected as failed.
+ Set<StorageLocation> failedVolumes = checker.checkAllVolumes(dataset);
+ assertThat(failedVolumes.size(), is(1));
+ }
+
+ /**
+ * Test checking a closed volume i.e. one which cannot be referenced.
+ *
+ * @throws Exception
+ */
+ @Test(timeout=60000)
+ public void testCheckingClosedVolume() throws Exception {
+ // Add a volume that cannot be referenced.
+ final List<FsVolumeSpi> volumes =
+ Collections.singletonList(makeClosedVolume());
+
+ final FsDatasetSpi<FsVolumeSpi> dataset =
+ TestDatasetVolumeChecker.makeDataset(volumes);
+
+ DatasetVolumeChecker checker =
+ new DatasetVolumeChecker(new HdfsConfiguration(), new FakeTimer());
+ Set<StorageLocation> failedVolumes = checker.checkAllVolumes(dataset);
+ assertThat(failedVolumes.size(), is(0));
+
+ // The closed volume should not have been checked as it cannot
+ // be referenced.
+ verify(volumes.get(0), times(0)).check(anyObject());
+ }
+
+ @Test(timeout=60000)
+ public void testMinGapIsEnforcedForSyncChecks() throws Exception {
+ final FsDatasetSpi<FsVolumeSpi> dataset =
+ TestDatasetVolumeChecker.makeDataset(Collections.emptyList());
+ final FakeTimer timer = new FakeTimer();
+ final Configuration conf = new HdfsConfiguration();
+ final long minGapMs = 100;
+ conf.setTimeDuration(DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY,
+ minGapMs, TimeUnit.MILLISECONDS);
+ final DatasetVolumeChecker checker = new DatasetVolumeChecker(conf, timer);
+
+ checker.checkAllVolumes(dataset);
+ assertThat(checker.getNumSyncDatasetChecks(), is(1L));
+
+ // Re-check without advancing the timer. Ensure the check is skipped.
+ checker.checkAllVolumes(dataset);
+ assertThat(checker.getNumSyncDatasetChecks(), is(1L));
+ assertThat(checker.getNumSkippedChecks(), is(1L));
+
+ // Re-check after advancing the timer. Ensure the check is performed.
+ timer.advance(minGapMs);
+ checker.checkAllVolumes(dataset);
+ assertThat(checker.getNumSyncDatasetChecks(), is(2L));
+ assertThat(checker.getNumSkippedChecks(), is(1L));
+ }
+
+ @Test(timeout=60000)
+ public void testMinGapIsEnforcedForASyncChecks() throws Exception {
+ final FsDatasetSpi<FsVolumeSpi> dataset =
+ TestDatasetVolumeChecker.makeDataset(Collections.emptyList());
+ final FakeTimer timer = new FakeTimer();
+ final Configuration conf = new HdfsConfiguration();
+ final long minGapMs = 100;
+ conf.setTimeDuration(DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY,
+ minGapMs, TimeUnit.MILLISECONDS);
+ final DatasetVolumeChecker checker = new DatasetVolumeChecker(conf, timer);
+
+ checker.checkAllVolumesAsync(dataset, null);
+ assertThat(checker.getNumAsyncDatasetChecks(), is(1L));
+
+ // Re-check without advancing the timer. Ensure the check is skipped.
+ checker.checkAllVolumesAsync(dataset, null);
+ assertThat(checker.getNumAsyncDatasetChecks(), is(1L));
+ assertThat(checker.getNumSkippedChecks(), is(1L));
+
+ // Re-check after advancing the timer. Ensure the check is performed.
+ timer.advance(minGapMs);
+ checker.checkAllVolumesAsync(dataset, null);
+ assertThat(checker.getNumAsyncDatasetChecks(), is(2L));
+ assertThat(checker.getNumSkippedChecks(), is(1L));
+ }
+
+ /**
+ * Create a mock FsVolumeSpi whose {@link FsVolumeSpi#check} routine
+ * hangs forever.
+ *
+ * @return volume
+ * @throws Exception
+ */
+ private static FsVolumeSpi makeHungVolume() throws Exception {
+ final FsVolumeSpi volume = mock(FsVolumeSpi.class);
+ final FsVolumeReference reference = mock(FsVolumeReference.class);
+ final StorageLocation location = mock(StorageLocation.class);
+
+ when(reference.getVolume()).thenReturn(volume);
+ when(volume.obtainReference()).thenReturn(reference);
+ when(volume.getStorageLocation()).thenReturn(location);
+ when(volume.check(anyObject())).thenAnswer(
+ new Answer<VolumeCheckResult>() {
+ @Override
+ public VolumeCheckResult answer(InvocationOnMock invocation)
+ throws Throwable {
+ Thread.sleep(Long.MAX_VALUE); // Sleep forever.
+ return VolumeCheckResult.HEALTHY; // unreachable.
+ }
+ });
+ return volume;
+ }
+
+ /**
+ * Create a mock FsVolumeSpi which is closed and hence cannot
+ * be referenced.
+ *
+ * @return volume
+ * @throws Exception
+ */
+ private static FsVolumeSpi makeClosedVolume() throws Exception {
+ final FsVolumeSpi volume = mock(FsVolumeSpi.class);
+ final StorageLocation location = mock(StorageLocation.class);
+
+ when(volume.obtainReference()).thenThrow(new ClosedChannelException());
+ when(volume.getStorageLocation()).thenReturn(location);
+ return volume;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/eaaa3295/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java
index 83d6c4c..2753a61 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.fs.DF;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner.ReportCompiler;
+import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
@@ -112,4 +113,10 @@ public class ExternalVolumeImpl implements FsVolumeSpi {
throws InterruptedException, IOException {
return null;
}
+
+ @Override
+ public VolumeCheckResult check(VolumeCheckContext context)
+ throws Exception {
+ return VolumeCheckResult.HEALTHY;
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org