You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ar...@apache.org on 2016/12/29 06:08:45 UTC
hadoop git commit: HDFS-11274. Datanode should only check the failed
volume upon IO errors. Contributed by Xiaoyu Yao.
Repository: hadoop
Updated Branches:
refs/heads/trunk ce3613c96 -> 603f3ef13
HDFS-11274. Datanode should only check the failed volume upon IO errors. Contributed by Xiaoyu Yao.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/603f3ef1
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/603f3ef1
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/603f3ef1
Branch: refs/heads/trunk
Commit: 603f3ef1386048111940b66f3a0750ab84d0588f
Parents: ce3613c
Author: Xiaoyu Yao <xy...@apache.org>
Authored: Wed Dec 28 22:08:13 2016 -0800
Committer: Arpit Agarwal <ar...@apache.org>
Committed: Wed Dec 28 22:08:13 2016 -0800
----------------------------------------------------------------------
.../hdfs/server/datanode/BlockReceiver.java | 12 +-
.../server/datanode/CountingFileIoEvents.java | 3 +-
.../hadoop/hdfs/server/datanode/DataNode.java | 91 ++++++++-----
.../server/datanode/DefaultFileIoEvents.java | 2 +-
.../hdfs/server/datanode/FileIoEvents.java | 36 ++++--
.../hdfs/server/datanode/FileIoProvider.java | 89 +++++++------
.../server/datanode/ProfilingFileIoEvents.java | 2 +-
.../hdfs/server/datanode/ReplicaInfo.java | 2 +-
.../server/datanode/checker/AsyncChecker.java | 5 +-
.../datanode/checker/DatasetVolumeChecker.java | 71 ++++++----
.../checker/StorageLocationChecker.java | 8 +-
.../datanode/checker/ThrottledAsyncChecker.java | 19 ++-
.../datanode/fsdataset/impl/BlockPoolSlice.java | 5 +-
.../datanode/fsdataset/impl/FsDatasetImpl.java | 2 +-
.../datanode/fsdataset/impl/FsVolumeImpl.java | 3 +-
.../fsdataset/impl/FsVolumeImplBuilder.java | 4 +-
.../server/datanode/SimulatedFSDataset.java | 2 +-
.../datanode/TestDataNodeHotSwapVolumes.java | 2 +-
.../TestDataNodeVolumeFailureReporting.java | 15 ++-
.../checker/TestDatasetVolumeChecker.java | 49 ++++---
.../TestDatasetVolumeCheckerFailures.java | 23 ----
.../checker/TestThrottledAsyncChecker.java | 128 +++++++++++--------
22 files changed, 338 insertions(+), 235 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/603f3ef1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
index b3aee11..567597d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
@@ -278,10 +278,9 @@ class BlockReceiver implements Closeable {
IOException cause = DatanodeUtil.getCauseIfDiskError(ioe);
DataNode.LOG.warn("IOException in BlockReceiver constructor"
+ (cause == null ? "" : ". Cause is "), cause);
-
- if (cause != null) { // possible disk error
+ if (cause != null) {
ioe = cause;
- datanode.checkDiskErrorAsync();
+ // Volume error check moved to FileIoProvider
}
throw ioe;
@@ -363,9 +362,8 @@ class BlockReceiver implements Closeable {
if (measuredFlushTime) {
datanode.metrics.addFlushNanos(flushTotalNanos);
}
- // disk check
if(ioe != null) {
- datanode.checkDiskErrorAsync();
+ // Volume error check moved to FileIoProvider
throw ioe;
}
}
@@ -792,7 +790,7 @@ class BlockReceiver implements Closeable {
manageWriterOsCache(offsetInBlock);
}
} catch (IOException iex) {
- datanode.checkDiskErrorAsync();
+ // Volume error check moved to FileIoProvider
throw iex;
}
}
@@ -1430,7 +1428,7 @@ class BlockReceiver implements Closeable {
} catch (IOException e) {
LOG.warn("IOException in BlockReceiver.run(): ", e);
if (running) {
- datanode.checkDiskErrorAsync();
+ // Volume error check moved to FileIoProvider
LOG.info(myString, e);
running = false;
if (!Thread.interrupted()) { // failure not caused by interruption
http://git-wip-us.apache.org/repos/asf/hadoop/blob/603f3ef1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/CountingFileIoEvents.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/CountingFileIoEvents.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/CountingFileIoEvents.java
index a70c151..7c6bfd6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/CountingFileIoEvents.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/CountingFileIoEvents.java
@@ -37,7 +37,7 @@ import java.util.concurrent.atomic.AtomicLong;
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
-public class CountingFileIoEvents implements FileIoEvents {
+public class CountingFileIoEvents extends FileIoEvents {
private final Map<OPERATION, Counts> counts;
private static class Counts {
@@ -90,7 +90,6 @@ public class CountingFileIoEvents implements FileIoEvents {
public void onFailure(
@Nullable FsVolumeSpi volume, OPERATION op, Exception e, long begin) {
counts.get(op).failures.incrementAndGet();
-
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/603f3ef1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index 4436e58..e893c5e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -372,6 +372,7 @@ public class DataNode extends ReconfigurableBase
SaslDataTransferClient saslClient;
SaslDataTransferServer saslServer;
private ObjectName dataNodeInfoBeanName;
+ // Test verification only
private volatile long lastDiskErrorCheck;
private String supergroup;
private boolean isPermissionEnabled;
@@ -412,7 +413,7 @@ public class DataNode extends ReconfigurableBase
this.tracer = createTracer(conf);
this.tracerConfigurationManager =
new TracerConfigurationManager(DATANODE_HTRACE_PREFIX, conf);
- this.fileIoProvider = new FileIoProvider(conf);
+ this.fileIoProvider = new FileIoProvider(conf, this);
this.fileDescriptorPassingDisabledReason = null;
this.maxNumberOfBlocksToLog = 0;
this.confVersion = null;
@@ -438,7 +439,7 @@ public class DataNode extends ReconfigurableBase
this.tracer = createTracer(conf);
this.tracerConfigurationManager =
new TracerConfigurationManager(DATANODE_HTRACE_PREFIX, conf);
- this.fileIoProvider = new FileIoProvider(conf);
+ this.fileIoProvider = new FileIoProvider(conf, this);
this.blockScanner = new BlockScanner(this);
this.lastDiskErrorCheck = 0;
this.maxNumberOfBlocksToLog = conf.getLong(DFS_MAX_NUM_BLOCKS_TO_LOG_KEY,
@@ -786,7 +787,7 @@ public class DataNode extends ReconfigurableBase
/**
* Remove volumes from DataNode.
- * See {@link #removeVolumes(Set, boolean)} for details.
+ * See {@link #removeVolumes(Collection, boolean)} for details.
*
* @param locations the StorageLocations of the volumes to be removed.
* @throws IOException
@@ -809,7 +810,7 @@ public class DataNode extends ReconfigurableBase
* <ul>Reset configuration DATA_DIR and {@link #dataDirs} to represent
* active volumes.</ul>
* </li>
- * @param absoluteVolumePaths the absolute path of volumes.
+ * @param storageLocations the absolute path of volumes.
* @param clearFailure if true, clears the failure information related to the
* volumes.
* @throws IOException
@@ -1293,7 +1294,7 @@ public class DataNode extends ReconfigurableBase
* If conf's CONFIG_PROPERTY_SIMULATED property is set
* then a simulated storage based data node is created.
*
- * @param dataDirs - only for a non-simulated storage data node
+ * @param dataDirectories - only for a non-simulated storage data node
* @throws IOException
*/
void startDataNode(List<StorageLocation> dataDirectories,
@@ -2045,14 +2046,33 @@ public class DataNode extends ReconfigurableBase
}
tracer.close();
}
-
-
+
/**
- * Check if there is a disk failure asynchronously and if so, handle the error
+ * Check if there is a disk failure asynchronously
+ * and if so, handle the error.
*/
+ @VisibleForTesting
public void checkDiskErrorAsync() {
volumeChecker.checkAllVolumesAsync(
data, (healthyVolumes, failedVolumes) -> {
+ if (failedVolumes.size() > 0) {
+ LOG.warn("checkDiskErrorAsync callback got {} failed volumes: {}",
+ failedVolumes.size(), failedVolumes);
+ } else {
+ LOG.debug("checkDiskErrorAsync: no volume failures detected");
+ }
+ lastDiskErrorCheck = Time.monotonicNow();
+ handleVolumeFailures(failedVolumes);
+ });
+ }
+
+ /**
+ * Check if there is a disk failure asynchronously
+ * and if so, handle the error.
+ */
+ public void checkDiskErrorAsync(FsVolumeSpi volume) {
+ volumeChecker.checkVolume(
+ volume, (healthyVolumes, failedVolumes) -> {
if (failedVolumes.size() > 0) {
LOG.warn("checkDiskErrorAsync callback got {} failed volumes: {}",
failedVolumes.size(), failedVolumes);
@@ -2064,9 +2084,10 @@ public class DataNode extends ReconfigurableBase
});
}
- private void handleDiskError(String errMsgr) {
+ private void handleDiskError(String failedVolumes) {
final boolean hasEnoughResources = data.hasEnoughResource();
- LOG.warn("DataNode.handleDiskError: Keep Running: " + hasEnoughResources);
+ LOG.warn("DataNode.handleDiskError on : [" + failedVolumes +
+ "] Keep Running: " + hasEnoughResources);
// If we have enough active valid volumes then we do not want to
// shutdown the DN completely.
@@ -2076,7 +2097,7 @@ public class DataNode extends ReconfigurableBase
//inform NameNodes
for(BPOfferService bpos: blockPoolManager.getAllNamenodeThreads()) {
- bpos.trySendErrorReport(dpError, errMsgr);
+ bpos.trySendErrorReport(dpError, failedVolumes);
}
if(hasEnoughResources) {
@@ -2084,7 +2105,8 @@ public class DataNode extends ReconfigurableBase
return; // do not shutdown
}
- LOG.warn("DataNode is shutting down: " + errMsgr);
+ LOG.warn("DataNode is shutting down due to failed volumes: ["
+ + failedVolumes + "]");
shouldRun = false;
}
@@ -2447,8 +2469,11 @@ public class DataNode extends ReconfigurableBase
}
LOG.warn(bpReg + ":Failed to transfer " + b + " to " +
targets[0] + " got ", ie);
- // check if there are any disk problem
- checkDiskErrorAsync();
+ // disk check moved to FileIoProvider
+ IOException cause = DatanodeUtil.getCauseIfDiskError(ie);
+ if (cause != null) { // possible disk error
+ LOG.warn("IOException in DataTransfer#run(). Cause is ", cause);
+ }
} finally {
decrementXmitsInProgress();
IOUtils.closeStream(blockSender);
@@ -3234,29 +3259,37 @@ public class DataNode extends ReconfigurableBase
}
private void handleVolumeFailures(Set<FsVolumeSpi> unhealthyVolumes) {
+ if (unhealthyVolumes.isEmpty()) {
+ LOG.debug("handleVolumeFailures done with empty " +
+ "unhealthyVolumes");
+ return;
+ }
+
data.handleVolumeFailures(unhealthyVolumes);
Set<StorageLocation> unhealthyLocations = new HashSet<>(
unhealthyVolumes.size());
- if (!unhealthyVolumes.isEmpty()) {
- StringBuilder sb = new StringBuilder("DataNode failed volumes:");
- for (FsVolumeSpi vol : unhealthyVolumes) {
- unhealthyLocations.add(vol.getStorageLocation());
- sb.append(vol.getStorageLocation()).append(";");
- }
+ StringBuilder sb = new StringBuilder("DataNode failed volumes:");
+ for (FsVolumeSpi vol : unhealthyVolumes) {
+ unhealthyLocations.add(vol.getStorageLocation());
+ sb.append(vol.getStorageLocation()).append(";");
+ }
- try {
- // Remove all unhealthy volumes from DataNode.
- removeVolumes(unhealthyLocations, false);
- } catch (IOException e) {
- LOG.warn("Error occurred when removing unhealthy storage dirs: "
- + e.getMessage(), e);
- }
- LOG.info(sb.toString());
- handleDiskError(sb.toString());
+ try {
+ // Remove all unhealthy volumes from DataNode.
+ removeVolumes(unhealthyLocations, false);
+ } catch (IOException e) {
+ LOG.warn("Error occurred when removing unhealthy storage dirs: "
+ + e.getMessage(), e);
}
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(sb.toString());
+ }
+ // send blockreport regarding volume failure
+ handleDiskError(sb.toString());
}
+ @VisibleForTesting
public long getLastDiskErrorCheck() {
return lastDiskErrorCheck;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/603f3ef1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DefaultFileIoEvents.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DefaultFileIoEvents.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DefaultFileIoEvents.java
index bd4932b..6a12aae 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DefaultFileIoEvents.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DefaultFileIoEvents.java
@@ -31,7 +31,7 @@ import javax.annotation.Nullable;
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
-public final class DefaultFileIoEvents implements FileIoEvents {
+public final class DefaultFileIoEvents extends FileIoEvents {
@Override
public long beforeMetadataOp(
@Nullable FsVolumeSpi volume, OPERATION op) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/603f3ef1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FileIoEvents.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FileIoEvents.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FileIoEvents.java
index 48e703f..10f2a0c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FileIoEvents.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FileIoEvents.java
@@ -32,7 +32,7 @@ import javax.annotation.Nullable;
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
-public interface FileIoEvents {
+public abstract class FileIoEvents {
/**
* Invoked before a filesystem metadata operation.
@@ -42,7 +42,7 @@ public interface FileIoEvents {
* @return timestamp at which the operation was started. 0 if
* unavailable.
*/
- long beforeMetadataOp(@Nullable FsVolumeSpi volume, OPERATION op);
+ abstract long beforeMetadataOp(@Nullable FsVolumeSpi volume, OPERATION op);
/**
* Invoked after a filesystem metadata operation has completed.
@@ -52,7 +52,8 @@ public interface FileIoEvents {
* @param begin timestamp at which the operation was started. 0
* if unavailable.
*/
- void afterMetadataOp(@Nullable FsVolumeSpi volume, OPERATION op, long begin);
+ abstract void afterMetadataOp(@Nullable FsVolumeSpi volume, OPERATION op,
+ long begin);
/**
* Invoked before a read/write/flush/channel transfer operation.
@@ -63,7 +64,8 @@ public interface FileIoEvents {
* @return timestamp at which the operation was started. 0 if
* unavailable.
*/
- long beforeFileIo(@Nullable FsVolumeSpi volume, OPERATION op, long len);
+ abstract long beforeFileIo(@Nullable FsVolumeSpi volume, OPERATION op,
+ long len);
/**
@@ -76,22 +78,38 @@ public interface FileIoEvents {
* @return timestamp at which the operation was started. 0 if
* unavailable.
*/
- void afterFileIo(@Nullable FsVolumeSpi volume, OPERATION op,
- long begin, long len);
+ abstract void afterFileIo(@Nullable FsVolumeSpi volume, OPERATION op,
+ long begin, long len);
/**
* Invoked if an operation fails with an exception.
- * @param volume target volume for the operation. Null if unavailable.
+ * @param volume target volume for the operation. Null if unavailable.
* @param op type of operation.
* @param e Exception encountered during the operation.
* @param begin time at which the operation was started.
*/
- void onFailure(
+ abstract void onFailure(
@Nullable FsVolumeSpi volume, OPERATION op, Exception e, long begin);
/**
+ * Invoked by FileIoProvider if an operation fails with an exception.
+ * @param datanode datanode that runs volume check upon volume io failure
+ * @param volume target volume for the operation. Null if unavailable.
+ * @param op type of operation.
+ * @param e Exception encountered during the operation.
+ * @param begin time at which the operation was started.
+ */
+ void onFailure(DataNode datanode,
+ @Nullable FsVolumeSpi volume, OPERATION op, Exception e, long begin) {
+ onFailure(volume, op, e, begin);
+ if (datanode != null && volume != null) {
+ datanode.checkDiskErrorAsync(volume);
+ }
+ }
+
+ /**
* Return statistics as a JSON string.
* @return
*/
- @Nullable String getStatistics();
+ @Nullable abstract String getStatistics();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/603f3ef1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FileIoProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FileIoProvider.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FileIoProvider.java
index 2344114..f961049 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FileIoProvider.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FileIoProvider.java
@@ -79,12 +79,16 @@ public class FileIoProvider {
FileIoProvider.class);
private final FileIoEvents eventHooks;
+ private final DataNode datanode;
/**
* @param conf Configuration object. May be null. When null,
* the event handlers are no-ops.
+ * @param datanode datanode that owns this FileIoProvider. Used for
+ * IO error based volume checker callback
*/
- public FileIoProvider(@Nullable Configuration conf) {
+ public FileIoProvider(@Nullable Configuration conf,
+ final DataNode datanode) {
if (conf != null) {
final Class<? extends FileIoEvents> clazz = conf.getClass(
DFSConfigKeys.DFS_DATANODE_FILE_IO_EVENTS_CLASS_KEY,
@@ -94,6 +98,7 @@ public class FileIoProvider {
} else {
eventHooks = new DefaultFileIoEvents();
}
+ this.datanode = datanode;
}
/**
@@ -139,7 +144,7 @@ public class FileIoProvider {
f.flush();
eventHooks.afterFileIo(volume, FLUSH, begin, 0);
} catch (Exception e) {
- eventHooks.onFailure(volume, FLUSH, e, begin);
+ eventHooks.onFailure(datanode, volume, FLUSH, e, begin);
throw e;
}
}
@@ -157,7 +162,7 @@ public class FileIoProvider {
fos.getChannel().force(true);
eventHooks.afterFileIo(volume, SYNC, begin, 0);
} catch (Exception e) {
- eventHooks.onFailure(volume, SYNC, e, begin);
+ eventHooks.onFailure(datanode, volume, SYNC, e, begin);
throw e;
}
}
@@ -176,7 +181,7 @@ public class FileIoProvider {
NativeIO.POSIX.syncFileRangeIfPossible(outFd, offset, numBytes, flags);
eventHooks.afterFileIo(volume, SYNC, begin, 0);
} catch (Exception e) {
- eventHooks.onFailure(volume, SYNC, e, begin);
+ eventHooks.onFailure(datanode, volume, SYNC, e, begin);
throw e;
}
}
@@ -196,7 +201,7 @@ public class FileIoProvider {
identifier, outFd, offset, length, flags);
eventHooks.afterMetadataOp(volume, FADVISE, begin);
} catch (Exception e) {
- eventHooks.onFailure(volume, FADVISE, e, begin);
+ eventHooks.onFailure(datanode, volume, FADVISE, e, begin);
throw e;
}
}
@@ -214,7 +219,7 @@ public class FileIoProvider {
eventHooks.afterMetadataOp(volume, DELETE, begin);
return deleted;
} catch (Exception e) {
- eventHooks.onFailure(volume, DELETE, e, begin);
+ eventHooks.onFailure(datanode, volume, DELETE, e, begin);
throw e;
}
}
@@ -236,7 +241,7 @@ public class FileIoProvider {
}
return deleted;
} catch (Exception e) {
- eventHooks.onFailure(volume, DELETE, e, begin);
+ eventHooks.onFailure(datanode, volume, DELETE, e, begin);
throw e;
}
}
@@ -264,7 +269,7 @@ public class FileIoProvider {
waitTime, transferTime);
eventHooks.afterFileIo(volume, TRANSFER, begin, count);
} catch (Exception e) {
- eventHooks.onFailure(volume, TRANSFER, e, begin);
+ eventHooks.onFailure(datanode, volume, TRANSFER, e, begin);
throw e;
}
}
@@ -285,7 +290,7 @@ public class FileIoProvider {
eventHooks.afterMetadataOp(volume, OPEN, begin);
return created;
} catch (Exception e) {
- eventHooks.onFailure(volume, OPEN, e, begin);
+ eventHooks.onFailure(datanode, volume, OPEN, e, begin);
throw e;
}
}
@@ -312,7 +317,7 @@ public class FileIoProvider {
return fis;
} catch(Exception e) {
org.apache.commons.io.IOUtils.closeQuietly(fis);
- eventHooks.onFailure(volume, OPEN, e, begin);
+ eventHooks.onFailure(datanode, volume, OPEN, e, begin);
throw e;
}
}
@@ -328,7 +333,7 @@ public class FileIoProvider {
* @param f File object.
* @param append if true, then bytes will be written to the end of the
* file rather than the beginning.
- * @param FileOutputStream to the given file object.
+ * @return FileOutputStream to the given file object.
* @throws FileNotFoundException
*/
public FileOutputStream getFileOutputStream(
@@ -342,7 +347,7 @@ public class FileIoProvider {
return fos;
} catch(Exception e) {
org.apache.commons.io.IOUtils.closeQuietly(fos);
- eventHooks.onFailure(volume, OPEN, e, begin);
+ eventHooks.onFailure(datanode, volume, OPEN, e, begin);
throw e;
}
}
@@ -372,7 +377,7 @@ public class FileIoProvider {
* before delegating to the wrapped stream.
*
* @param volume target volume. null if unavailable.
- * @param f File object.
+ * @param fd File descriptor object.
* @return FileOutputStream to the given file object.
* @throws FileNotFoundException
*/
@@ -407,7 +412,7 @@ public class FileIoProvider {
return fis;
} catch(Exception e) {
org.apache.commons.io.IOUtils.closeQuietly(fis);
- eventHooks.onFailure(volume, OPEN, e, begin);
+ eventHooks.onFailure(datanode, volume, OPEN, e, begin);
throw e;
}
}
@@ -438,7 +443,7 @@ public class FileIoProvider {
return fis;
} catch(Exception e) {
org.apache.commons.io.IOUtils.closeQuietly(fis);
- eventHooks.onFailure(volume, OPEN, e, begin);
+ eventHooks.onFailure(datanode, volume, OPEN, e, begin);
throw e;
}
}
@@ -468,7 +473,7 @@ public class FileIoProvider {
return raf;
} catch(Exception e) {
org.apache.commons.io.IOUtils.closeQuietly(raf);
- eventHooks.onFailure(volume, OPEN, e, begin);
+ eventHooks.onFailure(datanode, volume, OPEN, e, begin);
throw e;
}
}
@@ -487,7 +492,7 @@ public class FileIoProvider {
eventHooks.afterMetadataOp(volume, DELETE, begin);
return deleted;
} catch(Exception e) {
- eventHooks.onFailure(volume, DELETE, e, begin);
+ eventHooks.onFailure(datanode, volume, DELETE, e, begin);
throw e;
}
}
@@ -508,7 +513,7 @@ public class FileIoProvider {
FileUtil.replaceFile(src, target);
eventHooks.afterMetadataOp(volume, MOVE, begin);
} catch(Exception e) {
- eventHooks.onFailure(volume, MOVE, e, begin);
+ eventHooks.onFailure(datanode, volume, MOVE, e, begin);
throw e;
}
}
@@ -530,7 +535,7 @@ public class FileIoProvider {
Storage.rename(src, target);
eventHooks.afterMetadataOp(volume, MOVE, begin);
} catch(Exception e) {
- eventHooks.onFailure(volume, MOVE, e, begin);
+ eventHooks.onFailure(datanode, volume, MOVE, e, begin);
throw e;
}
}
@@ -552,7 +557,7 @@ public class FileIoProvider {
FileUtils.moveFile(src, target);
eventHooks.afterMetadataOp(volume, MOVE, begin);
} catch(Exception e) {
- eventHooks.onFailure(volume, MOVE, e, begin);
+ eventHooks.onFailure(datanode, volume, MOVE, e, begin);
throw e;
}
}
@@ -576,7 +581,7 @@ public class FileIoProvider {
Files.move(src, target, options);
eventHooks.afterMetadataOp(volume, MOVE, begin);
} catch(Exception e) {
- eventHooks.onFailure(volume, MOVE, e, begin);
+ eventHooks.onFailure(datanode, volume, MOVE, e, begin);
throw e;
}
}
@@ -600,7 +605,7 @@ public class FileIoProvider {
Storage.nativeCopyFileUnbuffered(src, target, preserveFileDate);
eventHooks.afterFileIo(volume, NATIVE_COPY, begin, length);
} catch(Exception e) {
- eventHooks.onFailure(volume, NATIVE_COPY, e, begin);
+ eventHooks.onFailure(datanode, volume, NATIVE_COPY, e, begin);
throw e;
}
}
@@ -625,7 +630,7 @@ public class FileIoProvider {
isDirectory = !created && dir.isDirectory();
eventHooks.afterMetadataOp(volume, MKDIRS, begin);
} catch(Exception e) {
- eventHooks.onFailure(volume, MKDIRS, e, begin);
+ eventHooks.onFailure(datanode, volume, MKDIRS, e, begin);
throw e;
}
@@ -651,7 +656,7 @@ public class FileIoProvider {
succeeded = dir.isDirectory() || dir.mkdirs();
eventHooks.afterMetadataOp(volume, MKDIRS, begin);
} catch(Exception e) {
- eventHooks.onFailure(volume, MKDIRS, e, begin);
+ eventHooks.onFailure(datanode, volume, MKDIRS, e, begin);
throw e;
}
@@ -677,7 +682,7 @@ public class FileIoProvider {
eventHooks.afterMetadataOp(volume, LIST, begin);
return children;
} catch(Exception e) {
- eventHooks.onFailure(volume, LIST, e, begin);
+ eventHooks.onFailure(datanode, volume, LIST, e, begin);
throw e;
}
}
@@ -687,7 +692,7 @@ public class FileIoProvider {
* {@link FileUtil#listFiles(File)}.
*
* @param volume target volume. null if unavailable.
- * @param Driectory to be listed.
+ * @param dir directory to be listed.
* @return array of strings representing the directory entries.
* @throws IOException
*/
@@ -699,7 +704,7 @@ public class FileIoProvider {
eventHooks.afterMetadataOp(volume, LIST, begin);
return children;
} catch(Exception e) {
- eventHooks.onFailure(volume, LIST, e, begin);
+ eventHooks.onFailure(datanode, volume, LIST, e, begin);
throw e;
}
}
@@ -722,7 +727,7 @@ public class FileIoProvider {
eventHooks.afterMetadataOp(volume, LIST, begin);
return children;
} catch(Exception e) {
- eventHooks.onFailure(volume, LIST, e, begin);
+ eventHooks.onFailure(datanode, volume, LIST, e, begin);
throw e;
}
}
@@ -744,7 +749,7 @@ public class FileIoProvider {
eventHooks.afterMetadataOp(volume, LIST, begin);
return count;
} catch(Exception e) {
- eventHooks.onFailure(volume, LIST, e, begin);
+ eventHooks.onFailure(datanode, volume, LIST, e, begin);
throw e;
}
}
@@ -763,7 +768,7 @@ public class FileIoProvider {
eventHooks.afterMetadataOp(volume, EXISTS, begin);
return exists;
} catch(Exception e) {
- eventHooks.onFailure(volume, EXISTS, e, begin);
+ eventHooks.onFailure(datanode, volume, EXISTS, e, begin);
throw e;
}
}
@@ -804,7 +809,7 @@ public class FileIoProvider {
eventHooks.afterFileIo(volume, READ, begin, 1);
return b;
} catch(Exception e) {
- eventHooks.onFailure(volume, READ, e, begin);
+ eventHooks.onFailure(datanode, volume, READ, e, begin);
throw e;
}
}
@@ -820,7 +825,7 @@ public class FileIoProvider {
eventHooks.afterFileIo(volume, READ, begin, numBytesRead);
return numBytesRead;
} catch(Exception e) {
- eventHooks.onFailure(volume, READ, e, begin);
+ eventHooks.onFailure(datanode, volume, READ, e, begin);
throw e;
}
}
@@ -836,7 +841,7 @@ public class FileIoProvider {
eventHooks.afterFileIo(volume, READ, begin, numBytesRead);
return numBytesRead;
} catch(Exception e) {
- eventHooks.onFailure(volume, READ, e, begin);
+ eventHooks.onFailure(datanode, volume, READ, e, begin);
throw e;
}
}
@@ -878,7 +883,7 @@ public class FileIoProvider {
super.write(b);
eventHooks.afterFileIo(volume, WRITE, begin, 1);
} catch(Exception e) {
- eventHooks.onFailure(volume, WRITE, e, begin);
+ eventHooks.onFailure(datanode, volume, WRITE, e, begin);
throw e;
}
}
@@ -893,7 +898,7 @@ public class FileIoProvider {
super.write(b);
eventHooks.afterFileIo(volume, WRITE, begin, b.length);
} catch(Exception e) {
- eventHooks.onFailure(volume, WRITE, e, begin);
+ eventHooks.onFailure(datanode, volume, WRITE, e, begin);
throw e;
}
}
@@ -908,7 +913,7 @@ public class FileIoProvider {
super.write(b, off, len);
eventHooks.afterFileIo(volume, WRITE, begin, len);
} catch(Exception e) {
- eventHooks.onFailure(volume, WRITE, e, begin);
+ eventHooks.onFailure(datanode, volume, WRITE, e, begin);
throw e;
}
}
@@ -936,7 +941,7 @@ public class FileIoProvider {
eventHooks.afterFileIo(volume, READ, begin, 1);
return b;
} catch(Exception e) {
- eventHooks.onFailure(volume, READ, e, begin);
+ eventHooks.onFailure(datanode, volume, READ, e, begin);
throw e;
}
}
@@ -949,7 +954,7 @@ public class FileIoProvider {
eventHooks.afterFileIo(volume, READ, begin, numBytesRead);
return numBytesRead;
} catch(Exception e) {
- eventHooks.onFailure(volume, READ, e, begin);
+ eventHooks.onFailure(datanode, volume, READ, e, begin);
throw e;
}
}
@@ -962,7 +967,7 @@ public class FileIoProvider {
eventHooks.afterFileIo(volume, READ, begin, numBytesRead);
return numBytesRead;
} catch(Exception e) {
- eventHooks.onFailure(volume, READ, e, begin);
+ eventHooks.onFailure(datanode, volume, READ, e, begin);
throw e;
}
}
@@ -974,7 +979,7 @@ public class FileIoProvider {
super.write(b);
eventHooks.afterFileIo(volume, WRITE, begin, 1);
} catch(Exception e) {
- eventHooks.onFailure(volume, WRITE, e, begin);
+ eventHooks.onFailure(datanode, volume, WRITE, e, begin);
throw e;
}
}
@@ -986,7 +991,7 @@ public class FileIoProvider {
super.write(b);
eventHooks.afterFileIo(volume, WRITE, begin, b.length);
} catch(Exception e) {
- eventHooks.onFailure(volume, WRITE, e, begin);
+ eventHooks.onFailure(datanode, volume, WRITE, e, begin);
throw e;
}
}
@@ -998,7 +1003,7 @@ public class FileIoProvider {
super.write(b, off, len);
eventHooks.afterFileIo(volume, WRITE, begin, len);
} catch(Exception e) {
- eventHooks.onFailure(volume, WRITE, e, begin);
+ eventHooks.onFailure(datanode, volume, WRITE, e, begin);
throw e;
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/603f3ef1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ProfilingFileIoEvents.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ProfilingFileIoEvents.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ProfilingFileIoEvents.java
index 5835fe8..affd093 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ProfilingFileIoEvents.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ProfilingFileIoEvents.java
@@ -30,7 +30,7 @@ import javax.annotation.Nullable;
* related operations on datanode volumes.
*/
@InterfaceAudience.Private
-class ProfilingFileIoEvents implements FileIoEvents {
+class ProfilingFileIoEvents extends FileIoEvents {
@Override
public long beforeMetadataOp(@Nullable FsVolumeSpi volume,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/603f3ef1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
index d3006c8..65e9ba7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
@@ -47,7 +47,7 @@ abstract public class ReplicaInfo extends Block
/** This is used by some tests and FsDatasetUtil#computeChecksum. */
private static final FileIoProvider DEFAULT_FILE_IO_PROVIDER =
- new FileIoProvider(null);
+ new FileIoProvider(null, null);
/**
* Constructor
http://git-wip-us.apache.org/repos/asf/hadoop/blob/603f3ef1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/AsyncChecker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/AsyncChecker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/AsyncChecker.java
index 1d534a3..997c0cb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/AsyncChecker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/AsyncChecker.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.datanode.checker;
+import com.google.common.base.Optional;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@@ -43,10 +44,10 @@ public interface AsyncChecker<K, V> {
* @param context the interpretation of the context depends on the
* target.
*
- * @return returns a {@link ListenableFuture} that can be used to
+ * @return returns a {@link Optional of ListenableFuture} that can be used to
* retrieve the result of the asynchronous check.
*/
- ListenableFuture<V> schedule(Checkable<K, V> target, K context);
+ Optional<ListenableFuture<V>> schedule(Checkable<K, V> target, K context);
/**
* Cancel all executing checks and wait for them to complete.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/603f3ef1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/DatasetVolumeChecker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/DatasetVolumeChecker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/DatasetVolumeChecker.java
index ba09d23..cab6122 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/DatasetVolumeChecker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/DatasetVolumeChecker.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.hdfs.server.datanode.checker;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.FutureCallback;
@@ -191,18 +192,26 @@ public class DatasetVolumeChecker {
for (int i = 0; i < references.size(); ++i) {
final FsVolumeReference reference = references.getReference(i);
- allVolumes.add(reference.getVolume());
- ListenableFuture<VolumeCheckResult> future =
+ Optional<ListenableFuture<VolumeCheckResult>> olf =
delegateChecker.schedule(reference.getVolume(), IGNORED_CONTEXT);
LOG.info("Scheduled health check for volume {}", reference.getVolume());
- Futures.addCallback(future, new ResultHandler(
- reference, healthyVolumes, failedVolumes, numVolumes, new Callback() {
- @Override
- public void call(Set<FsVolumeSpi> ignored1,
- Set<FsVolumeSpi> ignored2) {
+ if (olf.isPresent()) {
+ allVolumes.add(reference.getVolume());
+ Futures.addCallback(olf.get(),
+ new ResultHandler(reference, healthyVolumes, failedVolumes,
+ numVolumes, new Callback() {
+ @Override
+ public void call(Set<FsVolumeSpi> ignored1,
+ Set<FsVolumeSpi> ignored2) {
+ latch.countDown();
+ }
+ }));
+ } else {
+ IOUtils.cleanup(null, reference);
+ if (numVolumes.decrementAndGet() == 0) {
latch.countDown();
}
- }));
+ }
}
// Wait until our timeout elapses, after which we give up on
@@ -263,18 +272,26 @@ public class DatasetVolumeChecker {
final Set<FsVolumeSpi> healthyVolumes = new HashSet<>();
final Set<FsVolumeSpi> failedVolumes = new HashSet<>();
final AtomicLong numVolumes = new AtomicLong(references.size());
+ boolean added = false;
LOG.info("Checking {} volumes", references.size());
for (int i = 0; i < references.size(); ++i) {
final FsVolumeReference reference = references.getReference(i);
// The context parameter is currently ignored.
- ListenableFuture<VolumeCheckResult> future =
+ Optional<ListenableFuture<VolumeCheckResult>> olf =
delegateChecker.schedule(reference.getVolume(), IGNORED_CONTEXT);
- Futures.addCallback(future, new ResultHandler(
- reference, healthyVolumes, failedVolumes, numVolumes, callback));
+ if (olf.isPresent()) {
+ added = true;
+ Futures.addCallback(olf.get(),
+ new ResultHandler(reference, healthyVolumes, failedVolumes,
+ numVolumes, callback));
+ } else {
+ IOUtils.cleanup(null, reference);
+ numVolumes.decrementAndGet();
+ }
}
numAsyncDatasetChecks.incrementAndGet();
- return true;
+ return added;
}
/**
@@ -291,7 +308,7 @@ public class DatasetVolumeChecker {
}
/**
- * Check a single volume, returning a {@link ListenableFuture}
+ * Check a single volume asynchronously, returning a {@link ListenableFuture}
* that can be used to retrieve the final result.
*
* If the volume cannot be referenced then it is already closed and
@@ -305,21 +322,31 @@ public class DatasetVolumeChecker {
public boolean checkVolume(
final FsVolumeSpi volume,
Callback callback) {
+ if (volume == null) {
+ LOG.debug("Cannot schedule check on null volume");
+ return false;
+ }
+
FsVolumeReference volumeReference;
try {
volumeReference = volume.obtainReference();
} catch (ClosedChannelException e) {
// The volume has already been closed.
- callback.call(new HashSet<>(), new HashSet<>());
return false;
}
- ListenableFuture<VolumeCheckResult> future =
+
+ Optional<ListenableFuture<VolumeCheckResult>> olf =
delegateChecker.schedule(volume, IGNORED_CONTEXT);
- numVolumeChecks.incrementAndGet();
- Futures.addCallback(future, new ResultHandler(
- volumeReference, new HashSet<>(), new HashSet<>(),
- new AtomicLong(1), callback));
- return true;
+ if (olf.isPresent()) {
+ numVolumeChecks.incrementAndGet();
+ Futures.addCallback(olf.get(),
+ new ResultHandler(volumeReference, new HashSet<>(), new HashSet<>(),
+ new AtomicLong(1), callback));
+ return true;
+ } else {
+ IOUtils.cleanup(null, volumeReference);
+ }
+ return false;
}
/**
@@ -343,8 +370,8 @@ public class DatasetVolumeChecker {
* successful, add the volume here.
* @param failedVolumes set of failed volumes. If the disk check fails,
* add the volume here.
- * @param semaphore semaphore used to trigger callback invocation.
- * @param callback invoked when the semaphore can be successfully acquired.
+ * @param volumeCounter volumeCounter used to trigger callback invocation.
+ * @param callback invoked when the volumeCounter reaches 0.
*/
ResultHandler(FsVolumeReference reference,
Set<FsVolumeSpi> healthyVolumes,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/603f3ef1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/StorageLocationChecker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/StorageLocationChecker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/StorageLocationChecker.java
index 7337ad0..6e323e0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/StorageLocationChecker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/StorageLocationChecker.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.datanode.checker;
import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
+import com.google.common.base.Optional;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -158,8 +159,11 @@ public class StorageLocationChecker {
// Start parallel disk check operations on all StorageLocations.
for (StorageLocation location : dataDirs) {
goodLocations.put(location, true);
- futures.put(location,
- delegateChecker.schedule(location, context));
+ Optional<ListenableFuture<VolumeCheckResult>> olf =
+ delegateChecker.schedule(location, context);
+ if (olf.isPresent()) {
+ futures.put(location, olf.get());
+ }
}
if (maxVolumeFailuresTolerated >= dataDirs.size()) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/603f3ef1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/ThrottledAsyncChecker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/ThrottledAsyncChecker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/ThrottledAsyncChecker.java
index d0ee3d2..83c554d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/ThrottledAsyncChecker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/ThrottledAsyncChecker.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.datanode.checker;
+import com.google.common.base.Optional;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
@@ -101,13 +102,11 @@ public class ThrottledAsyncChecker<K, V> implements AsyncChecker<K, V> {
* will receive the same Future.
*/
@Override
- public synchronized ListenableFuture<V> schedule(
- final Checkable<K, V> target,
- final K context) {
- LOG.debug("Scheduling a check of {}", target);
-
+ public Optional<ListenableFuture<V>> schedule(Checkable<K, V> target,
+ K context) {
+ LOG.info("Scheduling a check for {}", target);
if (checksInProgress.containsKey(target)) {
- return checksInProgress.get(target);
+ return Optional.absent();
}
if (completedChecks.containsKey(target)) {
@@ -115,11 +114,9 @@ public class ThrottledAsyncChecker<K, V> implements AsyncChecker<K, V> {
final long msSinceLastCheck = timer.monotonicNow() - result.completedAt;
if (msSinceLastCheck < minMsBetweenChecks) {
LOG.debug("Skipped checking {}. Time since last check {}ms " +
- "is less than the min gap {}ms.",
+ "is less than the min gap {}ms.",
target, msSinceLastCheck, minMsBetweenChecks);
- return result.result != null ?
- Futures.immediateFuture(result.result) :
- Futures.immediateFailedFuture(result.exception);
+ return Optional.absent();
}
}
@@ -132,7 +129,7 @@ public class ThrottledAsyncChecker<K, V> implements AsyncChecker<K, V> {
});
checksInProgress.put(target, lf);
addResultCachingCallback(target, lf);
- return lf;
+ return Optional.of(lf);
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/603f3ef1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
index 8273ebb..c8df300 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
@@ -271,7 +271,10 @@ class BlockPoolSlice {
new FileOutputStream(outFile), "UTF-8")) {
// mtime is written last, so that truncated writes won't be valid.
out.write(Long.toString(used) + " " + Long.toString(timer.now()));
- fileIoProvider.flush(volume, out);
+ // This is only called as part of the volume shutdown.
+ // We explicitly avoid calling flush with fileIoProvider which triggers
+ // volume check upon io exception to avoid cyclic volume checks.
+ out.flush();
}
} catch (IOException ioe) {
// If write failed, the volume might be bad. Since the cache file is
http://git-wip-us.apache.org/repos/asf/hadoop/blob/603f3ef1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index 0d5a12c..d1f8f05 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -1828,7 +1828,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
return r;
}
// if file is not null, but doesn't exist - possibly disk failed
- datanode.checkDiskErrorAsync();
+ datanode.checkDiskErrorAsync(r.getVolume());
}
if (LOG.isDebugEnabled()) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/603f3ef1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
index 753c083..042ef6e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
@@ -1321,8 +1321,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
this, dir, BlockDirFilter.INSTANCE);
} catch (IOException ioe) {
LOG.warn("Exception occured while compiling report: ", ioe);
- // Initiate a check on disk failure.
- dataset.datanode.checkDiskErrorAsync();
+ // Volume error check moved to FileIoProvider.
// Ignore this directory and proceed.
return report;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/603f3ef1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImplBuilder.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImplBuilder.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImplBuilder.java
index 5371eda..427f81b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImplBuilder.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImplBuilder.java
@@ -69,7 +69,7 @@ public class FsVolumeImplBuilder {
FsVolumeImpl build() throws IOException {
return new FsVolumeImpl(
dataset, storageID, sd,
- fileIoProvider != null ? fileIoProvider : new FileIoProvider(null),
- conf);
+ fileIoProvider != null ? fileIoProvider :
+ new FileIoProvider(null, null), conf);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/603f3ef1/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
index 8472eca..cd3befd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
@@ -624,7 +624,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
}
registerMBean(datanodeUuid);
- this.fileIoProvider = new FileIoProvider(conf);
+ this.fileIoProvider = new FileIoProvider(conf, datanode);
this.storage = new SimulatedStorage(
conf.getLong(CONFIG_PROPERTY_CAPACITY, DEFAULT_CAPACITY),
conf.getEnum(CONFIG_PROPERTY_STATE, DEFAULT_STATE));
http://git-wip-us.apache.org/repos/asf/hadoop/blob/603f3ef1/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java
index e31e783..96d1a28 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java
@@ -807,7 +807,7 @@ public class TestDataNodeHotSwapVolumes {
DataNodeTestUtils.injectDataDirFailure(dirToFail);
// Call and wait DataNode to detect disk failure.
long lastDiskErrorCheck = dn.getLastDiskErrorCheck();
- dn.checkDiskErrorAsync();
+ dn.checkDiskErrorAsync(failedVolume);
while (dn.getLastDiskErrorCheck() == lastDiskErrorCheck) {
Thread.sleep(100);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/603f3ef1/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureReporting.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureReporting.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureReporting.java
index 3d37b10..3015e61 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureReporting.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureReporting.java
@@ -17,7 +17,7 @@
*/
package org.apache.hadoop.hdfs.server.datanode;
-import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
+import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
import static org.apache.hadoop.test.PlatformAssumptions.assumeNotWindows;
import static org.junit.Assert.assertArrayEquals;
@@ -319,6 +319,12 @@ public class TestDataNodeVolumeFailureReporting {
DFSTestUtil.createFile(fs, file1, 1024, (short)3, 1L);
DFSTestUtil.waitReplication(fs, file1, (short)3);
+ // Create additional file to trigger failure based volume check on dn1Vol2
+ // and dn2Vol2.
+ Path file2 = new Path("/test2");
+ DFSTestUtil.createFile(fs, file2, 1024, (short)3, 1L);
+ DFSTestUtil.waitReplication(fs, file2, (short)3);
+
ArrayList<DataNode> dns = cluster.getDataNodes();
assertTrue("DN1 should be up", dns.get(0).isDatanodeUp());
assertTrue("DN2 should be up", dns.get(1).isDatanodeUp());
@@ -538,8 +544,6 @@ public class TestDataNodeVolumeFailureReporting {
private void checkFailuresAtDataNode(DataNode dn,
long expectedVolumeFailuresCounter, boolean expectCapacityKnown,
String... expectedFailedVolumes) throws Exception {
- assertCounter("VolumeFailures", expectedVolumeFailuresCounter,
- getMetrics(dn.getMetrics().name()));
FsDatasetSpi<?> fsd = dn.getFSDataset();
StringBuilder strBuilder = new StringBuilder();
strBuilder.append("expectedFailedVolumes is ");
@@ -551,6 +555,11 @@ public class TestDataNodeVolumeFailureReporting {
strBuilder.append(expected + ",");
}
LOG.info(strBuilder.toString());
+ final long actualVolumeFailures =
+ getLongCounter("VolumeFailures", getMetrics(dn.getMetrics().name()));
+ assertTrue("Actual async detected volume failures should be greater or " +
+ "equal than " + expectedFailedVolumes,
+ actualVolumeFailures >= expectedVolumeFailuresCounter);
assertEquals(expectedFailedVolumes.length, fsd.getNumFailedVolumes());
assertArrayEquals(expectedFailedVolumes,
convertToAbsolutePaths(fsd.getFailedStorageLocations()));
http://git-wip-us.apache.org/repos/asf/hadoop/blob/603f3ef1/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeChecker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeChecker.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeChecker.java
index 50096ba..f5bb807 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeChecker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeChecker.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.datanode.checker;
+import com.google.common.base.Optional;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.hadoop.hdfs.HdfsConfiguration;
@@ -104,24 +105,28 @@ public class TestDatasetVolumeChecker {
/**
* Request a check and ensure it triggered {@link FsVolumeSpi#check}.
*/
- checker.checkVolume(volume, new DatasetVolumeChecker.Callback() {
- @Override
- public void call(Set<FsVolumeSpi> healthyVolumes,
- Set<FsVolumeSpi> failedVolumes) {
- numCallbackInvocations.incrementAndGet();
- if (expectedVolumeHealth != null && expectedVolumeHealth != FAILED) {
- assertThat(healthyVolumes.size(), is(1));
- assertThat(failedVolumes.size(), is(0));
- } else {
- assertThat(healthyVolumes.size(), is(0));
- assertThat(failedVolumes.size(), is(1));
- }
- }
- });
+ boolean result =
+ checker.checkVolume(volume, new DatasetVolumeChecker.Callback() {
+ @Override
+ public void call(Set<FsVolumeSpi> healthyVolumes,
+ Set<FsVolumeSpi> failedVolumes) {
+ numCallbackInvocations.incrementAndGet();
+ if (expectedVolumeHealth != null &&
+ expectedVolumeHealth != FAILED) {
+ assertThat(healthyVolumes.size(), is(1));
+ assertThat(failedVolumes.size(), is(0));
+ } else {
+ assertThat(healthyVolumes.size(), is(0));
+ assertThat(failedVolumes.size(), is(1));
+ }
+ }
+ });
// Ensure that the check was invoked at least once.
verify(volume, times(1)).check(anyObject());
- assertThat(numCallbackInvocations.get(), is(1L));
+ if (result) {
+ assertThat(numCallbackInvocations.get(), is(1L));
+ }
}
/**
@@ -173,7 +178,7 @@ public class TestDatasetVolumeChecker {
checker.setDelegateChecker(new DummyChecker());
final AtomicLong numCallbackInvocations = new AtomicLong(0);
- checker.checkAllVolumesAsync(
+ boolean result = checker.checkAllVolumesAsync(
dataset, new DatasetVolumeChecker.Callback() {
@Override
public void call(
@@ -193,7 +198,9 @@ public class TestDatasetVolumeChecker {
});
// The callback should be invoked exactly once.
- assertThat(numCallbackInvocations.get(), is(1L));
+ if (result) {
+ assertThat(numCallbackInvocations.get(), is(1L));
+ }
// Ensure each volume's check() method was called exactly once.
for (FsVolumeSpi volume : volumes) {
@@ -207,15 +214,17 @@ public class TestDatasetVolumeChecker {
*/
static class DummyChecker
implements AsyncChecker<VolumeCheckContext, VolumeCheckResult> {
+
@Override
- public ListenableFuture<VolumeCheckResult> schedule(
+ public Optional<ListenableFuture<VolumeCheckResult>> schedule(
Checkable<VolumeCheckContext, VolumeCheckResult> target,
VolumeCheckContext context) {
try {
- return Futures.immediateFuture(target.check(context));
+ return Optional.of(
+ Futures.immediateFuture(target.check(context)));
} catch (Exception e) {
LOG.info("check routine threw exception " + e);
- return Futures.immediateFailedFuture(e);
+ return Optional.of(Futures.immediateFailedFuture(e));
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/603f3ef1/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeCheckerFailures.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeCheckerFailures.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeCheckerFailures.java
index 16c333b..0fe892d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeCheckerFailures.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeCheckerFailures.java
@@ -132,29 +132,6 @@ public class TestDatasetVolumeCheckerFailures {
assertThat(checker.getNumSkippedChecks(), is(1L));
}
- @Test(timeout=60000)
- public void testMinGapIsEnforcedForASyncChecks() throws Exception {
- final List<FsVolumeSpi> volumes =
- TestDatasetVolumeChecker.makeVolumes(1, VolumeCheckResult.HEALTHY);
- final FsDatasetSpi<FsVolumeSpi> dataset =
- TestDatasetVolumeChecker.makeDataset(volumes);
- final DatasetVolumeChecker checker = new DatasetVolumeChecker(conf, timer);
-
- checker.checkAllVolumesAsync(dataset, null);
- assertThat(checker.getNumAsyncDatasetChecks(), is(1L));
-
- // Re-check without advancing the timer. Ensure the check is skipped.
- checker.checkAllVolumesAsync(dataset, null);
- assertThat(checker.getNumAsyncDatasetChecks(), is(1L));
- assertThat(checker.getNumSkippedChecks(), is(1L));
-
- // Re-check after advancing the timer. Ensure the check is performed.
- timer.advance(MIN_DISK_CHECK_GAP_MS);
- checker.checkAllVolumesAsync(dataset, null);
- assertThat(checker.getNumAsyncDatasetChecks(), is(2L));
- assertThat(checker.getNumSkippedChecks(), is(1L));
- }
-
/**
* Create a mock FsVolumeSpi whose {@link FsVolumeSpi#check} routine
* hangs forever.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/603f3ef1/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestThrottledAsyncChecker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestThrottledAsyncChecker.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestThrottledAsyncChecker.java
index 70795ca..c171c0f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestThrottledAsyncChecker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestThrottledAsyncChecker.java
@@ -18,15 +18,14 @@
package org.apache.hadoop.hdfs.server.datanode.checker;
+import com.google.common.base.Optional;
import com.google.common.base.Supplier;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.FakeTimer;
-import org.junit.Rule;
import org.junit.Test;
-import org.junit.rules.ExpectedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,10 +37,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.core.Is.isA;
import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -53,9 +49,6 @@ public class TestThrottledAsyncChecker {
LoggerFactory.getLogger(TestThrottledAsyncChecker.class);
private static final long MIN_ERROR_CHECK_GAP = 1000;
- @Rule
- public ExpectedException thrown = ExpectedException.none();
-
/**
* Test various scheduling combinations to ensure scheduling and
* throttling behave as expected.
@@ -70,34 +63,34 @@ public class TestThrottledAsyncChecker {
getExecutorService());
// check target1 and ensure we get back the expected result.
- assertTrue(checker.schedule(target1, true).get());
- assertThat(target1.numChecks.get(), is(1L));
+ assertTrue(checker.schedule(target1, true).isPresent());
+ waitTestCheckableCheckCount(target1, 1L);
// Check target1 again without advancing the timer. target1 should not
- // be checked again and the cached result should be returned.
- assertTrue(checker.schedule(target1, true).get());
- assertThat(target1.numChecks.get(), is(1L));
+ // be checked again.
+ assertFalse(checker.schedule(target1, true).isPresent());
+ waitTestCheckableCheckCount(target1, 1L);
// Schedule target2 scheduled without advancing the timer.
// target2 should be checked as it has never been checked before.
- assertTrue(checker.schedule(target2, true).get());
- assertThat(target2.numChecks.get(), is(1L));
+ assertTrue(checker.schedule(target2, true).isPresent());
+ waitTestCheckableCheckCount(target2, 1L);
// Advance the timer but just short of the min gap.
// Neither target1 nor target2 should be checked again.
timer.advance(MIN_ERROR_CHECK_GAP - 1);
- assertTrue(checker.schedule(target1, true).get());
- assertThat(target1.numChecks.get(), is(1L));
- assertTrue(checker.schedule(target2, true).get());
- assertThat(target2.numChecks.get(), is(1L));
+ assertFalse(checker.schedule(target1, true).isPresent());
+ waitTestCheckableCheckCount(target1, 1L);
+ assertFalse(checker.schedule(target2, true).isPresent());
+ waitTestCheckableCheckCount(target2, 1L);
// Advance the timer again.
// Both targets should be checked now.
timer.advance(MIN_ERROR_CHECK_GAP);
- assertTrue(checker.schedule(target1, true).get());
- assertThat(target1.numChecks.get(), is(2L));
- assertTrue(checker.schedule(target2, true).get());
- assertThat(target1.numChecks.get(), is(2L));
+ assertTrue(checker.schedule(target1, true).isPresent());
+ waitTestCheckableCheckCount(target1, 2L);
+ assertTrue(checker.schedule(target2, true).isPresent());
+ waitTestCheckableCheckCount(target2, 2L);
}
@Test (timeout=60000)
@@ -109,13 +102,16 @@ public class TestThrottledAsyncChecker {
new ThrottledAsyncChecker<>(timer, MIN_ERROR_CHECK_GAP,
getExecutorService());
- ListenableFuture<Boolean> lf = checker.schedule(target, true);
- Futures.addCallback(lf, callback);
+ Optional<ListenableFuture<Boolean>> olf =
+ checker.schedule(target, true);
+ if (olf.isPresent()) {
+ Futures.addCallback(olf.get(), callback);
+ }
// Request immediate cancellation.
checker.shutdownAndWait(0, TimeUnit.MILLISECONDS);
try {
- assertFalse(lf.get());
+ assertFalse(olf.get().get());
fail("Failed to get expected InterruptedException");
} catch (ExecutionException ee) {
assertTrue(ee.getCause() instanceof InterruptedException);
@@ -130,27 +126,33 @@ public class TestThrottledAsyncChecker {
ThrottledAsyncChecker<Boolean, Boolean> checker =
new ThrottledAsyncChecker<>(timer, MIN_ERROR_CHECK_GAP,
getExecutorService());
- final ListenableFuture<Boolean> lf1 = checker.schedule(target, true);
- final ListenableFuture<Boolean> lf2 = checker.schedule(target, true);
+ final Optional<ListenableFuture<Boolean>> olf1 =
+ checker.schedule(target, true);
- // Ensure that concurrent requests return the same future object.
- assertTrue(lf1 == lf2);
+ final Optional<ListenableFuture<Boolean>> olf2 =
+ checker.schedule(target, true);
+
+ // Ensure that concurrent requests return the future object
+ // for the first caller.
+ assertTrue(olf1.isPresent());
+ assertFalse(olf2.isPresent());
// Unblock the latch and wait for it to finish execution.
target.latch.countDown();
- lf1.get();
+ olf1.get().get();
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
- // We should not get back the same future as before.
+ // We should get an absent Optional.
// This can take a short while until the internal callback in
// ThrottledAsyncChecker is scheduled for execution.
// Also this should not trigger a new check operation as the timer
// was not advanced. If it does trigger a new check then the test
// will fail with a timeout.
- final ListenableFuture<Boolean> lf3 = checker.schedule(target, true);
- return lf3 != lf2;
+ final Optional<ListenableFuture<Boolean>> olf3 =
+ checker.schedule(target, true);
+ return !olf3.isPresent();
}
}, 100, 10000);
}
@@ -168,15 +170,29 @@ public class TestThrottledAsyncChecker {
new ThrottledAsyncChecker<>(timer, MIN_ERROR_CHECK_GAP,
getExecutorService());
- assertTrue(checker.schedule(target1, true).get());
- assertThat(target1.numChecks.get(), is(1L));
+ assertTrue(checker.schedule(target1, true).isPresent());
+ waitTestCheckableCheckCount(target1, 1L);
timer.advance(MIN_ERROR_CHECK_GAP + 1);
- assertFalse(checker.schedule(target1, false).get());
- assertThat(target1.numChecks.get(), is(2L));
+ assertTrue(checker.schedule(target1, false).isPresent());
+ waitTestCheckableCheckCount(target1, 2L);
+
}
+ private void waitTestCheckableCheckCount(TestCheckableBase target,
+ long expectedChecks) throws Exception {
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ // This can take a short while until the internal callback in
+ // ThrottledAsyncChecker is scheduled for execution.
+ // If it does trigger a new check then the test
+ // will fail with a timeout.
+ return target.getTotalChecks() == expectedChecks;
+ }
+ }, 100, 10000);
+ }
/**
- * Ensure that the exeption from a failed check is cached
+ * Ensure that the exception from a failed check is cached
* and returned without re-running the check when the minimum
* gap has not elapsed.
*
@@ -190,13 +206,11 @@ public class TestThrottledAsyncChecker {
new ThrottledAsyncChecker<>(timer, MIN_ERROR_CHECK_GAP,
getExecutorService());
- thrown.expectCause(isA(DummyException.class));
- checker.schedule(target1, true).get();
- assertThat(target1.numChecks.get(), is(1L));
+ assertTrue(checker.schedule(target1, true).isPresent());
+ waitTestCheckableCheckCount(target1, 1L);
- thrown.expectCause(isA(DummyException.class));
- checker.schedule(target1, true).get();
- assertThat(target1.numChecks.get(), is(2L));
+ assertFalse(checker.schedule(target1, true).isPresent());
+ waitTestCheckableCheckCount(target1, 1L);
}
/**
@@ -206,28 +220,38 @@ public class TestThrottledAsyncChecker {
return new ScheduledThreadPoolExecutor(1);
}
+ private abstract static class TestCheckableBase
+ implements Checkable<Boolean, Boolean> {
+ protected final AtomicLong numChecks = new AtomicLong(0);
+
+ public long getTotalChecks() {
+ return numChecks.get();
+ }
+
+ public void incrTotalChecks() {
+ numChecks.incrementAndGet();
+ }
+ }
+
/**
* A Checkable that just returns its input.
*/
private static class NoOpCheckable
- implements Checkable<Boolean, Boolean> {
- private final AtomicLong numChecks = new AtomicLong(0);
+ extends TestCheckableBase {
@Override
public Boolean check(Boolean context) {
- numChecks.incrementAndGet();
+ incrTotalChecks();
return context;
}
}
private static class ThrowingCheckable
- implements Checkable<Boolean, Boolean> {
- private final AtomicLong numChecks = new AtomicLong(0);
+ extends TestCheckableBase {
@Override
public Boolean check(Boolean context) throws DummyException {
- numChecks.incrementAndGet();
+ incrTotalChecks();
throw new DummyException();
}
-
}
private static class DummyException extends Exception {
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org