You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by xg...@apache.org on 2017/06/05 20:30:51 UTC
[02/50] [abbrv] hadoop git commit: HDFS-11856. Ability to re-add
Upgrading Nodes to pipeline for future pipeline updates. Contributed by
Vinayakumar B.
HDFS-11856. Ability to re-add Upgrading Nodes to pipeline for future pipeline updates. Contributed by Vinayakumar B.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/81cf4ca6
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/81cf4ca6
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/81cf4ca6
Branch: refs/heads/YARN-5734
Commit: 81cf4ca686ab7fb64a7a0a1b1d6bed4d8ac6c060
Parents: 595aab8
Author: Kihwal Lee <ki...@apache.org>
Authored: Thu May 25 13:04:09 2017 -0500
Committer: Xuan <xg...@apache.org>
Committed: Mon Jun 5 13:29:17 2017 -0700
----------------------------------------------------------------------
.../hadoop/hdfs/DFSClientFaultInjector.java | 4 +
.../org/apache/hadoop/hdfs/DataStreamer.java | 70 +++++++++++----
.../hdfs/server/datanode/BlockReceiver.java | 6 +-
.../server/datanode/fsdataset/FsDatasetSpi.java | 2 +-
.../impl/FsDatasetAsyncDiskService.java | 14 ++-
.../datanode/fsdataset/impl/FsDatasetImpl.java | 85 ++++++++++++------
.../TestClientProtocolForPipelineRecovery.java | 92 ++++++++++++++++++++
.../server/datanode/SimulatedFSDataset.java | 6 +-
.../server/datanode/TestSimulatedFSDataset.java | 2 +-
.../extdataset/ExternalDatasetImpl.java | 3 +-
.../fsdataset/impl/TestWriteToReplica.java | 20 +++--
11 files changed, 241 insertions(+), 63 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/81cf4ca6/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java
index 4eb4c52..748edcd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java
@@ -57,4 +57,8 @@ public class DFSClientFaultInjector {
public void fetchFromDatanodeException() {}
public void readFromDatanodeDelay() {}
+
+ public boolean skipRollingRestartWait() {
+ return false;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/81cf4ca6/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
index 49c17b9..f5ce0ff 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
@@ -327,6 +327,7 @@ class DataStreamer extends Daemon {
static class ErrorState {
ErrorType error = ErrorType.NONE;
private int badNodeIndex = -1;
+ private boolean waitForRestart = true;
private int restartingNodeIndex = -1;
private long restartingNodeDeadline = 0;
private final long datanodeRestartTimeout;
@@ -342,6 +343,7 @@ class DataStreamer extends Daemon {
badNodeIndex = -1;
restartingNodeIndex = -1;
restartingNodeDeadline = 0;
+ waitForRestart = true;
}
synchronized void reset() {
@@ -349,6 +351,7 @@ class DataStreamer extends Daemon {
badNodeIndex = -1;
restartingNodeIndex = -1;
restartingNodeDeadline = 0;
+ waitForRestart = true;
}
synchronized boolean hasInternalError() {
@@ -389,14 +392,19 @@ class DataStreamer extends Daemon {
return restartingNodeIndex;
}
- synchronized void initRestartingNode(int i, String message) {
+ synchronized void initRestartingNode(int i, String message,
+ boolean shouldWait) {
restartingNodeIndex = i;
- restartingNodeDeadline = Time.monotonicNow() + datanodeRestartTimeout;
- // If the data streamer has already set the primary node
- // bad, clear it. It is likely that the write failed due to
- // the DN shutdown. Even if it was a real failure, the pipeline
- // recovery will take care of it.
- badNodeIndex = -1;
+ if (shouldWait) {
+ restartingNodeDeadline = Time.monotonicNow() + datanodeRestartTimeout;
+ // If the data streamer has already set the primary node
+ // bad, clear it. It is likely that the write failed due to
+ // the DN shutdown. Even if it was a real failure, the pipeline
+ // recovery will take care of it.
+ badNodeIndex = -1;
+ } else {
+ this.waitForRestart = false;
+ }
LOG.info(message);
}
@@ -405,7 +413,7 @@ class DataStreamer extends Daemon {
}
synchronized boolean isNodeMarked() {
- return badNodeIndex >= 0 || isRestartingNode();
+ return badNodeIndex >= 0 || (isRestartingNode() && doWaitForRestart());
}
/**
@@ -430,7 +438,7 @@ class DataStreamer extends Daemon {
} else if (badNodeIndex < restartingNodeIndex) {
// the node index has shifted.
restartingNodeIndex--;
- } else {
+ } else if (waitForRestart) {
throw new IllegalStateException("badNodeIndex = " + badNodeIndex
+ " = restartingNodeIndex = " + restartingNodeIndex);
}
@@ -472,6 +480,10 @@ class DataStreamer extends Daemon {
}
}
}
+
+ boolean doWaitForRestart() {
+ return waitForRestart;
+ }
}
private volatile boolean streamerClosed = false;
@@ -491,6 +503,8 @@ class DataStreamer extends Daemon {
/** Nodes have been used in the pipeline before and have failed. */
private final List<DatanodeInfo> failed = new ArrayList<>();
+ /** Restarting Nodes */
+ private List<DatanodeInfo> restartingNodes = new ArrayList<>();
/** The times have retried to recover pipeline, for the same packet. */
private volatile int pipelineRecoveryCount = 0;
/** Has the current block been hflushed? */
@@ -1043,6 +1057,13 @@ class DataStreamer extends Daemon {
return true;
}
+ /*
+ * Treat all nodes as remote for test when skip enabled.
+ */
+ if (DFSClientFaultInjector.get().skipRollingRestartWait()) {
+ return false;
+ }
+
// Is it a local node?
InetAddress addr = null;
try {
@@ -1110,11 +1131,11 @@ class DataStreamer extends Daemon {
}
// Restart will not be treated differently unless it is
// the local node or the only one in the pipeline.
- if (PipelineAck.isRestartOOBStatus(reply) &&
- shouldWaitForRestart(i)) {
+ if (PipelineAck.isRestartOOBStatus(reply)) {
final String message = "Datanode " + i + " is restarting: "
+ targets[i];
- errorState.initRestartingNode(i, message);
+ errorState.initRestartingNode(i, message,
+ shouldWaitForRestart(i));
throw new IOException(message);
}
// node error
@@ -1492,6 +1513,14 @@ class DataStreamer extends Daemon {
*/
boolean handleRestartingDatanode() {
if (errorState.isRestartingNode()) {
+ if (!errorState.doWaitForRestart()) {
+ // If node is restarting and not worth to wait for restart then can go
+ // ahead with error recovery considering it as bad node for now. Later
+ // it should be able to re-consider the same node for future pipeline
+ // updates.
+ errorState.setBadNodeIndex(errorState.getRestartingNodeIndex());
+ return true;
+ }
// 4 seconds or the configured deadline period, whichever is shorter.
// This is the retry interval and recovery will be retried in this
// interval until timeout or success.
@@ -1523,9 +1552,14 @@ class DataStreamer extends Daemon {
return false;
}
+ String reason = "bad.";
+ if (errorState.getRestartingNodeIndex() == badNodeIndex) {
+ reason = "restarting.";
+ restartingNodes.add(nodes[badNodeIndex]);
+ }
LOG.warn("Error Recovery for " + block + " in pipeline "
+ Arrays.toString(nodes) + ": datanode " + badNodeIndex
- + "("+ nodes[badNodeIndex] + ") is bad.");
+ + "("+ nodes[badNodeIndex] + ") is " + reason);
failed.add(nodes[badNodeIndex]);
DatanodeInfo[] newnodes = new DatanodeInfo[nodes.length-1];
@@ -1735,6 +1769,9 @@ class DataStreamer extends Daemon {
blockStream = out;
result = true; // success
errorState.resetInternalError();
+ // remove all restarting nodes from failed nodes list
+ failed.removeAll(restartingNodes);
+ restartingNodes.clear();
} catch (IOException ie) {
if (!errorState.isRestartingNode()) {
LOG.info("Exception in createBlockOutputStream " + this, ie);
@@ -1768,9 +1805,10 @@ class DataStreamer extends Daemon {
final int i = errorState.getBadNodeIndex();
// Check whether there is a restart worth waiting for.
- if (checkRestart && shouldWaitForRestart(i)) {
- errorState.initRestartingNode(i, "Datanode " + i + " is restarting: "
- + nodes[i]);
+ if (checkRestart) {
+ errorState.initRestartingNode(i,
+ "Datanode " + i + " is restarting: " + nodes[i],
+ shouldWaitForRestart(i));
}
errorState.setInternalError();
lastException.set(ie);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/81cf4ca6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
index 2ab4067..c5462a9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
@@ -207,7 +207,7 @@ class BlockReceiver implements Closeable {
//
if (isDatanode) { //replication or move
replicaHandler =
- datanode.data.createTemporary(storageType, storageId, block);
+ datanode.data.createTemporary(storageType, storageId, block, false);
} else {
switch (stage) {
case PIPELINE_SETUP_CREATE:
@@ -236,8 +236,8 @@ class BlockReceiver implements Closeable {
case TRANSFER_RBW:
case TRANSFER_FINALIZED:
// this is a transfer destination
- replicaHandler =
- datanode.data.createTemporary(storageType, storageId, block);
+ replicaHandler = datanode.data.createTemporary(storageType, storageId,
+ block, isTransfer);
break;
default: throw new IOException("Unsupported stage " + stage +
" while receiving block " + block + " from " + inAddr);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/81cf4ca6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
index d7e29cf..fd3af5d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
@@ -319,7 +319,7 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
* @throws IOException if an error occurs
*/
ReplicaHandler createTemporary(StorageType storageType, String storageId,
- ExtendedBlock b) throws IOException;
+ ExtendedBlock b, boolean isTransfer) throws IOException;
/**
* Creates a RBW replica and returns the meta info of the replica
http://git-wip-us.apache.org/repos/asf/hadoop/blob/81cf4ca6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java
index 416609d..9174cb0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java
@@ -227,7 +227,19 @@ class FsDatasetAsyncDiskService {
volumeRef, replicaToDelete, block, trashDirectory);
execute(((FsVolumeImpl) volumeRef.getVolume()), deletionTask);
}
-
+
+ /**
+ * Delete the block file and meta file from the disk synchronously, adjust
+ * dfsUsed statistics accordingly.
+ */
+ void deleteSync(FsVolumeReference volumeRef, ReplicaInfo replicaToDelete,
+ ExtendedBlock block, String trashDirectory) {
+ LOG.info("Deleting " + block.getLocalBlock() + " replica " + replicaToDelete);
+ ReplicaFileDeleteTask deletionTask = new ReplicaFileDeleteTask(volumeRef,
+ replicaToDelete, block, trashDirectory);
+ deletionTask.run();
+ }
+
/** A task for deleting a block file and its associated meta file, as well
* as decrement the dfs usage of the volume.
* Optionally accepts a trash directory. If one is specified then the files
http://git-wip-us.apache.org/repos/asf/hadoop/blob/81cf4ca6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index e7d4d25..eb4455b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -1504,37 +1504,29 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
}
@Override // FsDatasetSpi
- public ReplicaHandler createTemporary(
- StorageType storageType, String storageId, ExtendedBlock b)
+ public ReplicaHandler createTemporary(StorageType storageType,
+ String storageId, ExtendedBlock b, boolean isTransfer)
throws IOException {
long startTimeMs = Time.monotonicNow();
long writerStopTimeoutMs = datanode.getDnConf().getXceiverStopTimeout();
ReplicaInfo lastFoundReplicaInfo = null;
+ boolean isInPipeline = false;
do {
try (AutoCloseableLock lock = datasetLock.acquire()) {
ReplicaInfo currentReplicaInfo =
volumeMap.get(b.getBlockPoolId(), b.getBlockId());
if (currentReplicaInfo == lastFoundReplicaInfo) {
- if (lastFoundReplicaInfo != null) {
- invalidate(b.getBlockPoolId(), new Block[] { lastFoundReplicaInfo });
- }
- FsVolumeReference ref =
- volumes.getNextVolume(storageType, storageId, b.getNumBytes());
- FsVolumeImpl v = (FsVolumeImpl) ref.getVolume();
- ReplicaInPipeline newReplicaInfo;
- try {
- newReplicaInfo = v.createTemporary(b);
- } catch (IOException e) {
- IOUtils.cleanup(null, ref);
- throw e;
- }
-
- volumeMap.add(b.getBlockPoolId(), newReplicaInfo.getReplicaInfo());
- return new ReplicaHandler(newReplicaInfo, ref);
+ break;
} else {
- if (!(currentReplicaInfo.getGenerationStamp() < b.getGenerationStamp()
- && (currentReplicaInfo.getState() == ReplicaState.TEMPORARY
- || currentReplicaInfo.getState() == ReplicaState.RBW))) {
+ isInPipeline = currentReplicaInfo.getState() == ReplicaState.TEMPORARY
+ || currentReplicaInfo.getState() == ReplicaState.RBW;
+ /*
+ * If the current block is old, reject.
+ * else If transfer request, then accept it.
+ * else if state is not RBW/Temporary, then reject
+ */
+ if ((currentReplicaInfo.getGenerationStamp() >= b.getGenerationStamp())
+ || (!isTransfer && !isInPipeline)) {
throw new ReplicaAlreadyExistsException("Block " + b
+ " already exists in state " + currentReplicaInfo.getState()
+ " and thus cannot be created.");
@@ -1542,7 +1534,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
lastFoundReplicaInfo = currentReplicaInfo;
}
}
-
+ if (!isInPipeline) {
+ continue;
+ }
// Hang too long, just bail out. This is not supposed to happen.
long writerStopMs = Time.monotonicNow() - startTimeMs;
if (writerStopMs > writerStopTimeoutMs) {
@@ -1555,6 +1549,29 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
// Stop the previous writer
((ReplicaInPipeline)lastFoundReplicaInfo).stopWriter(writerStopTimeoutMs);
} while (true);
+
+ if (lastFoundReplicaInfo != null) {
+ // Old blockfile should be deleted synchronously as it might collide
+ // with the new block if allocated in same volume.
+ // Do the deletion outside of lock as its DISK IO.
+ invalidate(b.getBlockPoolId(), new Block[] { lastFoundReplicaInfo },
+ false);
+ }
+ try (AutoCloseableLock lock = datasetLock.acquire()) {
+ FsVolumeReference ref = volumes.getNextVolume(storageType, storageId, b
+ .getNumBytes());
+ FsVolumeImpl v = (FsVolumeImpl) ref.getVolume();
+ ReplicaInPipeline newReplicaInfo;
+ try {
+ newReplicaInfo = v.createTemporary(b);
+ } catch (IOException e) {
+ IOUtils.cleanup(null, ref);
+ throw e;
+ }
+
+ volumeMap.add(b.getBlockPoolId(), newReplicaInfo.getReplicaInfo());
+ return new ReplicaHandler(newReplicaInfo, ref);
+ }
}
/**
@@ -1877,6 +1894,11 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
*/
@Override // FsDatasetSpi
public void invalidate(String bpid, Block invalidBlks[]) throws IOException {
+ invalidate(bpid, invalidBlks, true);
+ }
+
+ private void invalidate(String bpid, Block[] invalidBlks, boolean async)
+ throws IOException {
final List<String> errors = new ArrayList<String>();
for (int i = 0; i < invalidBlks.length; i++) {
final ReplicaInfo removing;
@@ -1947,13 +1969,20 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
// If the block is cached, start uncaching it.
cacheManager.uncacheBlock(bpid, invalidBlks[i].getBlockId());
- // Delete the block asynchronously to make sure we can do it fast enough.
- // It's ok to unlink the block file before the uncache operation
- // finishes.
try {
- asyncDiskService.deleteAsync(v.obtainReference(), removing,
- new ExtendedBlock(bpid, invalidBlks[i]),
- dataStorage.getTrashDirectoryForReplica(bpid, removing));
+ if (async) {
+ // Delete the block asynchronously to make sure we can do it fast
+ // enough.
+ // It's ok to unlink the block file before the uncache operation
+ // finishes.
+ asyncDiskService.deleteAsync(v.obtainReference(), removing,
+ new ExtendedBlock(bpid, invalidBlks[i]),
+ dataStorage.getTrashDirectoryForReplica(bpid, removing));
+ } else {
+ asyncDiskService.deleteSync(v.obtainReference(), removing,
+ new ExtendedBlock(bpid, invalidBlks[i]),
+ dataStorage.getTrashDirectoryForReplica(bpid, removing));
+ }
} catch (ClosedChannelException e) {
LOG.warn("Volume " + v + " is closed, ignore the deletion task for " +
"block " + invalidBlks[i]);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/81cf4ca6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java
index 1a640b4..0212c4e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java
@@ -17,9 +17,11 @@
*/
package org.apache.hadoop.hdfs;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
@@ -33,6 +35,7 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.BlockWrite;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
@@ -437,6 +440,95 @@ public class TestClientProtocolForPipelineRecovery {
}
}
+ @Test
+ public void testPipelineRecoveryOnRemoteDatanodeUpgrade() throws Exception {
+ Configuration conf = new HdfsConfiguration();
+ conf.setBoolean(BlockWrite.ReplaceDatanodeOnFailure.BEST_EFFORT_KEY, true);
+ MiniDFSCluster cluster = null;
+ DFSClientFaultInjector old = DFSClientFaultInjector.get();
+ try {
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
+ cluster.waitActive();
+ FileSystem fileSys = cluster.getFileSystem();
+
+ Path file = new Path("/testPipelineRecoveryOnDatanodeUpgrade");
+ DFSTestUtil.createFile(fileSys, file, 10240L, (short) 3, 0L);
+ // treat all restarting nodes as remote for test.
+ DFSClientFaultInjector.set(new DFSClientFaultInjector() {
+ public boolean skipRollingRestartWait() {
+ return true;
+ }
+ });
+
+ final DFSOutputStream out = (DFSOutputStream) fileSys.append(file)
+ .getWrappedStream();
+ final AtomicBoolean running = new AtomicBoolean(true);
+ final AtomicBoolean failed = new AtomicBoolean(false);
+ Thread t = new Thread() {
+ public void run() {
+ while (running.get()) {
+ try {
+ out.write("test".getBytes());
+ out.hflush();
+ // Keep writing data every one second
+ Thread.sleep(1000);
+ } catch (IOException | InterruptedException e) {
+ LOG.error("Exception during write", e);
+ failed.set(true);
+ break;
+ }
+ }
+ running.set(false);
+ }
+ };
+ t.start();
+ // Let write start
+ Thread.sleep(1000);
+ DatanodeInfo[] pipeline = out.getPipeline();
+ for (DatanodeInfo node : pipeline) {
+ assertFalse("Write should be going on", failed.get());
+ ArrayList<DataNode> dataNodes = cluster.getDataNodes();
+ int indexToShutdown = 0;
+ for (int i = 0; i < dataNodes.size(); i++) {
+ if (dataNodes.get(i).getIpcPort() == node.getIpcPort()) {
+ indexToShutdown = i;
+ break;
+ }
+ }
+
+ // Note old genstamp to findout pipeline recovery
+ final long oldGs = out.getBlock().getGenerationStamp();
+ MiniDFSCluster.DataNodeProperties dnProps = cluster
+ .stopDataNodeForUpgrade(indexToShutdown);
+ GenericTestUtils.waitForThreadTermination(
+ "Async datanode shutdown thread", 100, 10000);
+ cluster.restartDataNode(dnProps, true);
+ cluster.waitActive();
+ // wait pipeline to be recovered
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ return out.getBlock().getGenerationStamp() > oldGs;
+ }
+ }, 100, 10000);
+ Assert.assertEquals("The pipeline recovery count shouldn't increase", 0,
+ out.getStreamer().getPipelineRecoveryCount());
+ }
+ assertFalse("Write should be going on", failed.get());
+ running.set(false);
+ t.join();
+ out.write("testagain".getBytes());
+ assertTrue("There should be atleast 2 nodes in pipeline still", out
+ .getPipeline().length >= 2);
+ out.close();
+ } finally {
+ DFSClientFaultInjector.set(old);
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+ }
+
/**
* Test to make sure the checksum is set correctly after pipeline
* recovery transfers 0 byte partial block. If fails the test case
http://git-wip-us.apache.org/repos/asf/hadoop/blob/81cf4ca6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
index 18b4922..afa7a82 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
@@ -1025,12 +1025,12 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
public synchronized ReplicaHandler createRbw(
StorageType storageType, String storageId, ExtendedBlock b,
boolean allowLazyPersist) throws IOException {
- return createTemporary(storageType, storageId, b);
+ return createTemporary(storageType, storageId, b, false);
}
@Override // FsDatasetSpi
- public synchronized ReplicaHandler createTemporary(
- StorageType storageType, String storageId, ExtendedBlock b)
+ public synchronized ReplicaHandler createTemporary(StorageType storageType,
+ String storageId, ExtendedBlock b, boolean isTransfer)
throws IOException {
if (isValidBlock(b)) {
throw new ReplicaAlreadyExistsException("Block " + b +
http://git-wip-us.apache.org/repos/asf/hadoop/blob/81cf4ca6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
index 2e69595..469e249b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
@@ -368,7 +368,7 @@ public class TestSimulatedFSDataset {
ExtendedBlock block = new ExtendedBlock(newbpid,1);
try {
// it will throw an exception if the block pool is not found
- fsdataset.createTemporary(StorageType.DEFAULT, null, block);
+ fsdataset.createTemporary(StorageType.DEFAULT, null, block, false);
} catch (IOException ioe) {
// JUnit does not capture exception in non-main thread,
// so cache it and then let main thread throw later.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/81cf4ca6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
index 2e439d6..d14bd72 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
@@ -139,8 +139,7 @@ public class ExternalDatasetImpl implements FsDatasetSpi<ExternalVolumeImpl> {
@Override
public ReplicaHandler createTemporary(StorageType t, String i,
- ExtendedBlock b)
- throws IOException {
+ ExtendedBlock b, boolean isTransfer) throws IOException {
return new ReplicaHandler(new ExternalReplicaInPipeline(), null);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/81cf4ca6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java
index 11525ed..657e618 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java
@@ -435,44 +435,48 @@ public class TestWriteToReplica {
private void testWriteToTemporary(FsDatasetImpl dataSet, ExtendedBlock[] blocks) throws IOException {
try {
- dataSet.createTemporary(StorageType.DEFAULT, null, blocks[FINALIZED]);
+ dataSet.createTemporary(StorageType.DEFAULT, null, blocks[FINALIZED],
+ false);
Assert.fail("Should not have created a temporary replica that was " +
"finalized " + blocks[FINALIZED]);
} catch (ReplicaAlreadyExistsException e) {
}
try {
- dataSet.createTemporary(StorageType.DEFAULT, null, blocks[TEMPORARY]);
+ dataSet.createTemporary(StorageType.DEFAULT, null, blocks[TEMPORARY],
+ false);
Assert.fail("Should not have created a replica that had created as" +
"temporary " + blocks[TEMPORARY]);
} catch (ReplicaAlreadyExistsException e) {
}
try {
- dataSet.createTemporary(StorageType.DEFAULT, null, blocks[RBW]);
+ dataSet.createTemporary(StorageType.DEFAULT, null, blocks[RBW], false);
Assert.fail("Should not have created a replica that had created as RBW " +
blocks[RBW]);
} catch (ReplicaAlreadyExistsException e) {
}
try {
- dataSet.createTemporary(StorageType.DEFAULT, null, blocks[RWR]);
+ dataSet.createTemporary(StorageType.DEFAULT, null, blocks[RWR], false);
Assert.fail("Should not have created a replica that was waiting to be " +
"recovered " + blocks[RWR]);
} catch (ReplicaAlreadyExistsException e) {
}
try {
- dataSet.createTemporary(StorageType.DEFAULT, null, blocks[RUR]);
+ dataSet.createTemporary(StorageType.DEFAULT, null, blocks[RUR], false);
Assert.fail("Should not have created a replica that was under recovery " +
blocks[RUR]);
} catch (ReplicaAlreadyExistsException e) {
}
- dataSet.createTemporary(StorageType.DEFAULT, null, blocks[NON_EXISTENT]);
+ dataSet.createTemporary(StorageType.DEFAULT, null, blocks[NON_EXISTENT],
+ false);
try {
- dataSet.createTemporary(StorageType.DEFAULT, null, blocks[NON_EXISTENT]);
+ dataSet.createTemporary(StorageType.DEFAULT, null, blocks[NON_EXISTENT],
+ false);
Assert.fail("Should not have created a replica that had already been "
+ "created " + blocks[NON_EXISTENT]);
} catch (Exception e) {
@@ -486,7 +490,7 @@ public class TestWriteToReplica {
try {
ReplicaInPipeline replicaInfo =
dataSet.createTemporary(StorageType.DEFAULT, null,
- blocks[NON_EXISTENT]).getReplica();
+ blocks[NON_EXISTENT], false).getReplica();
Assert.assertTrue(replicaInfo.getGenerationStamp() == newGenStamp);
Assert.assertTrue(
replicaInfo.getBlockId() == blocks[NON_EXISTENT].getBlockId());
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org