You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ki...@apache.org on 2017/06/01 21:46:48 UTC
hadoop git commit: HDFS-11856. Ability to re-add upgrading nodes to
pipeline for future pipeline updates. Contributed by Vinayakumar B.
Repository: hadoop
Updated Branches:
refs/heads/branch-2.7 01cdea732 -> d3b86234b
HDFS-11856. Ability to re-add upgrading nodes to pipeline for future pipeline updates. Contributed by Vinayakumar B.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d3b86234
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d3b86234
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d3b86234
Branch: refs/heads/branch-2.7
Commit: d3b86234b29031fda2805a805705d336a179a816
Parents: 01cdea7
Author: Kihwal Lee <ki...@apache.org>
Authored: Thu Jun 1 16:45:55 2017 -0500
Committer: Kihwal Lee <ki...@apache.org>
Committed: Thu Jun 1 16:45:55 2017 -0500
----------------------------------------------------------------------
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 +
.../hadoop/hdfs/DFSClientFaultInjector.java | 8 ++
.../org/apache/hadoop/hdfs/DFSOutputStream.java | 80 +++++++++-----
.../hdfs/server/datanode/BlockReceiver.java | 5 +-
.../server/datanode/fsdataset/FsDatasetSpi.java | 5 +-
.../impl/FsDatasetAsyncDiskService.java | 12 +++
.../datanode/fsdataset/impl/FsDatasetImpl.java | 91 ++++++++++------
.../TestClientProtocolForPipelineRecovery.java | 103 ++++++++++++++++++-
.../server/datanode/SimulatedFSDataset.java | 4 +-
.../extdataset/ExternalDatasetImpl.java | 2 +-
.../fsdataset/impl/TestWriteToReplica.java | 18 ++--
11 files changed, 260 insertions(+), 71 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3b86234/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index bac3499..2538699 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -340,6 +340,9 @@ Release 2.7.4 - UNRELEASED
HDFS-5042. Completed files lost after power failure. (vinayakumarb via kihwal)
+ HDFS-11856. Ability to re-add upgrading nodes to pipeline for future
+ pipeline updates. (vinayakumarb via kihwal)
+
Release 2.7.3 - 2016-08-25
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3b86234/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java
index 5392c66..974fb22 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java
@@ -33,6 +33,10 @@ public class DFSClientFaultInjector {
public static DFSClientFaultInjector instance = new DFSClientFaultInjector();
public static AtomicLong exceptionNum = new AtomicLong(0);
+ public static void set(DFSClientFaultInjector dfsClientFaultInjector) {
+ instance = dfsClientFaultInjector;
+ }
+
public static DFSClientFaultInjector get() {
return instance;
}
@@ -54,4 +58,8 @@ public class DFSClientFaultInjector {
public void fetchFromDatanodeException() {}
public void readFromDatanodeDelay() {}
+
+ public boolean skipRollingRestartWait() {
+ return false;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3b86234/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
index 188502f..1289b30 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
@@ -346,6 +346,7 @@ public class DFSOutputStream extends FSOutputSummer
volatile int errorIndex = -1;
// Restarting node index
AtomicInteger restartingNodeIndex = new AtomicInteger(-1);
+ volatile boolean waitForRestart = true;
private long restartDeadline = 0; // Deadline of DN restart
private BlockConstructionStage stage; // block construction stage
private long bytesSent = 0; // number of bytes that've been sent
@@ -353,6 +354,8 @@ public class DFSOutputStream extends FSOutputSummer
/** Nodes have been used in the pipeline before and have failed. */
private final List<DatanodeInfo> failed = new ArrayList<DatanodeInfo>();
+ /** Restarting Nodes */
+ private final List<DatanodeInfo> restartingNodes = new ArrayList<>();
/** The times have retried to recover pipeline, for the same packet. */
private volatile int pipelineRecoveryCount = 0;
/** Has the current block been hflushed? */
@@ -792,6 +795,13 @@ public class DFSOutputStream extends FSOutputSummer
return true;
}
+ /*
+ * Treat all nodes as remote for test when skip enabled.
+ */
+ if (DFSClientFaultInjector.get().skipRollingRestartWait()) {
+ return false;
+ }
+
// Is it a local node?
InetAddress addr = null;
try {
@@ -852,10 +862,14 @@ public class DFSOutputStream extends FSOutputSummer
.getHeaderFlag(i));
// Restart will not be treated differently unless it is
// the local node or the only one in the pipeline.
- if (PipelineAck.isRestartOOBStatus(reply) &&
- shouldWaitForRestart(i)) {
- restartDeadline = dfsClient.getConf().datanodeRestartTimeout
- + Time.monotonicNow();
+ if (PipelineAck.isRestartOOBStatus(reply)) {
+ if (shouldWaitForRestart(i)) {
+ restartDeadline = dfsClient.getConf().datanodeRestartTimeout
+ + Time.monotonicNow();
+ waitForRestart = true;
+ } else {
+ waitForRestart = false;
+ }
setRestartingNodeIndex(i);
String message = "A datanode is restarting: " + targets[i];
DFSClient.LOG.info(message);
@@ -1171,18 +1185,24 @@ public class DFSOutputStream extends FSOutputSummer
// This process will be repeated until the deadline or the datanode
// starts back up.
if (restartingNodeIndex.get() >= 0) {
- // 4 seconds or the configured deadline period, whichever is shorter.
- // This is the retry interval and recovery will be retried in this
- // interval until timeout or success.
- long delay = Math.min(dfsClient.getConf().datanodeRestartTimeout,
- 4000L);
- try {
- Thread.sleep(delay);
- } catch (InterruptedException ie) {
- lastException.set(new IOException("Interrupted while waiting for " +
- "datanode to restart. " + nodes[restartingNodeIndex.get()]));
- streamerClosed = true;
- return false;
+ if (!waitForRestart) {
+ setErrorIndex(restartingNodeIndex.get());
+ } else {
+ // 4 seconds or the configured deadline period, whichever is
+ // shorter.
+ // This is the retry interval and recovery will be retried in this
+ // interval until timeout or success.
+ long delay = Math.min(dfsClient.getConf().datanodeRestartTimeout,
+ 4000L);
+ try {
+ Thread.sleep(delay);
+ } catch (InterruptedException ie) {
+ lastException.set(new IOException("Interrupted while waiting for "
+ + "datanode to restart. " + nodes[restartingNodeIndex
+ .get()]));
+ streamerClosed = true;
+ return false;
+ }
}
}
boolean isRecovery = hasError;
@@ -1204,9 +1224,14 @@ public class DFSOutputStream extends FSOutputSummer
streamerClosed = true;
return false;
}
- DFSClient.LOG.warn("Error Recovery for block " + block +
- " in pipeline " + pipelineMsg +
- ": bad datanode " + nodes[errorIndex]);
+ String reason = "bad.";
+ if (restartingNodeIndex.get() == errorIndex) {
+ reason = "restarting.";
+ restartingNodes.add(nodes[errorIndex]);
+ }
+ DFSClient.LOG.warn("Error Recovery for block " + block
+ + " in pipeline " + pipelineMsg + ": datanode " + errorIndex + "("
+ + nodes[errorIndex] + ") is " + reason);
failed.add(nodes[errorIndex]);
DatanodeInfo[] newnodes = new DatanodeInfo[nodes.length-1];
@@ -1229,7 +1254,7 @@ public class DFSOutputStream extends FSOutputSummer
} else if (errorIndex < restartingNodeIndex.get()) {
// the node index has shifted.
restartingNodeIndex.decrementAndGet();
- } else {
+ } else if (waitForRestart) {
// this shouldn't happen...
assert false;
}
@@ -1458,7 +1483,11 @@ public class DFSOutputStream extends FSOutputSummer
blockStream = out;
result = true; // success
restartingNodeIndex.set(-1);
+ waitForRestart = true;
hasError = false;
+ // remove all restarting nodes from failed nodes list
+ failed.removeAll(restartingNodes);
+ restartingNodes.clear();
} catch (IOException ie) {
if (restartingNodeIndex.get() == -1) {
DFSClient.LOG.info("Exception in createBlockOutputStream", ie);
@@ -1489,9 +1518,14 @@ public class DFSOutputStream extends FSOutputSummer
errorIndex = 0;
}
// Check whether there is a restart worth waiting for.
- if (checkRestart && shouldWaitForRestart(errorIndex)) {
- restartDeadline = dfsClient.getConf().datanodeRestartTimeout +
- Time.monotonicNow();
+ if (checkRestart) {
+ if (shouldWaitForRestart(errorIndex)) {
+ restartDeadline = dfsClient.getConf().datanodeRestartTimeout
+ + Time.monotonicNow();
+ waitForRestart = true;
+ } else {
+ waitForRestart = false;
+ }
restartingNodeIndex.set(errorIndex);
errorIndex = -1;
DFSClient.LOG.info("Waiting for the datanode to be restarted: " +
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3b86234/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
index 45b6ce2..5e5e777 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
@@ -194,7 +194,8 @@ class BlockReceiver implements Closeable {
// Open local disk out
//
if (isDatanode) { //replication or move
- replicaHandler = datanode.data.createTemporary(storageType, block);
+ replicaHandler = datanode.data.createTemporary(storageType, block,
+ false);
} else {
switch (stage) {
case PIPELINE_SETUP_CREATE:
@@ -223,7 +224,7 @@ class BlockReceiver implements Closeable {
case TRANSFER_FINALIZED:
// this is a transfer destination
replicaHandler =
- datanode.data.createTemporary(storageType, block);
+ datanode.data.createTemporary(storageType, block, isTransfer);
break;
default: throw new IOException("Unsupported stage " + stage +
" while receiving block " + block + " from " + inAddr);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3b86234/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
index 8378725..ea9bd02 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
@@ -211,13 +211,14 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
/**
* Creates a temporary replica and returns the meta information of the replica
- *
* @param b block
+ * @param isTransfer whether for transfer
+ *
* @return the meta info of the replica which is being written to
* @throws IOException if an error occurs
*/
public ReplicaHandler createTemporary(StorageType storageType,
- ExtendedBlock b) throws IOException;
+ ExtendedBlock b, boolean isTransfer) throws IOException;
/**
* Creates a RBW replica and returns the meta info of the replica
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3b86234/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java
index c1d3990..e6e5824 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java
@@ -220,6 +220,18 @@ class FsDatasetAsyncDiskService {
volumeRef, blockFile, metaFile, block, trashDirectory);
execute(((FsVolumeImpl) volumeRef.getVolume()).getCurrentDir(), deletionTask);
}
+
+ /**
+ * Delete the block file and meta file from the disk synchronously, adjust
+ * dfsUsed statistics accordingly.
+ */
+ void deleteSync(FsVolumeReference volumeRef, File blockFile, File metaFile,
+ ExtendedBlock block, String trashDirectory) {
+ LOG.info("Deleting " + block.getLocalBlock() + " file " + blockFile);
+ ReplicaFileDeleteTask deletionTask = new ReplicaFileDeleteTask(
+ volumeRef, blockFile, metaFile, block, trashDirectory);
+ deletionTask.run();
+ }
/** A task for deleting a block file and its associated meta file, as well
* as decrement the dfs usage of the volume.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3b86234/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index 76867b1..1886590 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -1466,38 +1466,28 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
}
@Override // FsDatasetSpi
- public ReplicaHandler createTemporary(
- StorageType storageType, ExtendedBlock b) throws IOException {
+ public ReplicaHandler createTemporary(StorageType storageType,
+ ExtendedBlock b, boolean isTransfer) throws IOException {
long startTimeMs = Time.monotonicNow();
long writerStopTimeoutMs = datanode.getDnConf().getXceiverStopTimeout();
ReplicaInfo lastFoundReplicaInfo = null;
+ boolean isInPipeline = false;
do {
synchronized (this) {
ReplicaInfo currentReplicaInfo =
volumeMap.get(b.getBlockPoolId(), b.getBlockId());
if (currentReplicaInfo == lastFoundReplicaInfo) {
- if (lastFoundReplicaInfo != null) {
- invalidate(b.getBlockPoolId(), new Block[] { lastFoundReplicaInfo });
- }
- FsVolumeReference ref =
- volumes.getNextVolume(storageType, b.getNumBytes());
- FsVolumeImpl v = (FsVolumeImpl) ref.getVolume();
- // create a temporary file to hold block in the designated volume
- File f;
- try {
- f = v.createTmpFile(b.getBlockPoolId(), b.getLocalBlock());
- } catch (IOException e) {
- IOUtils.cleanup(null, ref);
- throw e;
- }
- ReplicaInPipeline newReplicaInfo =
- new ReplicaInPipeline(b.getBlockId(), b.getGenerationStamp(), v,
- f.getParentFile(), 0);
- volumeMap.add(b.getBlockPoolId(), newReplicaInfo);
- return new ReplicaHandler(newReplicaInfo, ref);
+ break;
} else {
- if (!(currentReplicaInfo.getGenerationStamp() < b
- .getGenerationStamp() && currentReplicaInfo instanceof ReplicaInPipeline)) {
+ isInPipeline = currentReplicaInfo.getState() == ReplicaState.TEMPORARY
+ || currentReplicaInfo.getState() == ReplicaState.RBW;
+ /*
+ * If the current block is old, reject.
+ * else If transfer request, then accept it.
+ * else if state is not RBW/Temporary, then reject
+ */
+ if ((currentReplicaInfo.getGenerationStamp() >= b.getGenerationStamp())
+ || (!isTransfer && !isInPipeline)) {
throw new ReplicaAlreadyExistsException("Block " + b
+ " already exists in state " + currentReplicaInfo.getState()
+ " and thus cannot be created.");
@@ -1506,6 +1496,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
}
}
+ if (!isInPipeline) {
+ continue;
+ }
// Hang too long, just bail out. This is not supposed to happen.
long writerStopMs = Time.monotonicNow() - startTimeMs;
if (writerStopMs > writerStopTimeoutMs) {
@@ -1519,6 +1512,31 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
((ReplicaInPipeline) lastFoundReplicaInfo)
.stopWriter(writerStopTimeoutMs);
} while (true);
+
+ if (lastFoundReplicaInfo != null) {
+ // Old blockfile should be deleted synchronously as it might collide
+ // with the new block if allocated in same volume.
+ // Do the deletion outside of lock as its DISK IO.
+ invalidate(b.getBlockPoolId(), new Block[] { lastFoundReplicaInfo },
+ false);
+ }
+ synchronized (this) {
+ FsVolumeReference ref = volumes.getNextVolume(storageType, b
+ .getNumBytes());
+ FsVolumeImpl v = (FsVolumeImpl) ref.getVolume();
+ // create a temporary file to hold block in the designated volume
+ File f;
+ try {
+ f = v.createTmpFile(b.getBlockPoolId(), b.getLocalBlock());
+ } catch (IOException e) {
+ IOUtils.cleanup(null, ref);
+ throw e;
+ }
+ ReplicaInPipeline newReplicaInfo = new ReplicaInPipeline(b.getBlockId(), b
+ .getGenerationStamp(), v, f.getParentFile(), 0);
+ volumeMap.add(b.getBlockPoolId(), newReplicaInfo);
+ return new ReplicaHandler(newReplicaInfo, ref);
+ }
}
/**
@@ -1849,6 +1867,11 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
*/
@Override // FsDatasetSpi
public void invalidate(String bpid, Block invalidBlks[]) throws IOException {
+ invalidate(bpid, invalidBlks, true);
+ }
+
+ private void invalidate(String bpid, Block[] invalidBlks, boolean async)
+ throws IOException {
final List<String> errors = new ArrayList<String>();
for (int i = 0; i < invalidBlks.length; i++) {
final File f;
@@ -1910,14 +1933,22 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
// If the block is cached, start uncaching it.
cacheManager.uncacheBlock(bpid, invalidBlks[i].getBlockId());
- // Delete the block asynchronously to make sure we can do it fast enough.
- // It's ok to unlink the block file before the uncache operation
- // finishes.
try {
- asyncDiskService.deleteAsync(v.obtainReference(), f,
- FsDatasetUtil.getMetaFile(f, invalidBlks[i].getGenerationStamp()),
- new ExtendedBlock(bpid, invalidBlks[i]),
- dataStorage.getTrashDirectoryForBlockFile(bpid, f));
+ // Delete the block asynchronously to make sure we can do it fast
+ // enough.
+ // It's ok to unlink the block file before the uncache operation
+ // finishes.
+ if (async) {
+ asyncDiskService.deleteAsync(v.obtainReference(), f,
+ FsDatasetUtil.getMetaFile(f, invalidBlks[i].getGenerationStamp()),
+ new ExtendedBlock(bpid, invalidBlks[i]),
+ dataStorage.getTrashDirectoryForBlockFile(bpid, f));
+ } else {
+ asyncDiskService.deleteSync(v.obtainReference(), f,
+ FsDatasetUtil.getMetaFile(f, invalidBlks[i].getGenerationStamp()),
+ new ExtendedBlock(bpid, invalidBlks[i]),
+ dataStorage.getTrashDirectoryForBlockFile(bpid, f));
+ }
} catch (ClosedChannelException e) {
LOG.warn("Volume " + v + " is closed, ignore the deletion task for " +
"block " + invalidBlks[i]);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3b86234/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java
index 47fce0e..63a6f62 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java
@@ -17,7 +17,12 @@
*/
package org.apache.hadoop.hdfs;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
import com.google.common.base.Supplier;
import org.apache.hadoop.conf.Configuration;
@@ -39,13 +44,16 @@ import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
-import org.mockito.stubbing.Answer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* This tests pipeline recovery related client protocol works correct or not.
*/
public class TestClientProtocolForPipelineRecovery {
-
+ private static final Logger LOG = LoggerFactory.getLogger(
+ TestClientProtocolForPipelineRecovery.class);
+
@Test public void testGetNewStamp() throws IOException {
int numDataNodes = 1;
Configuration conf = new HdfsConfiguration();
@@ -428,4 +436,95 @@ public class TestClientProtocolForPipelineRecovery {
DataNodeFaultInjector.set(oldDnInjector);
}
}
+
+ @Test
+ public void testPipelineRecoveryOnRemoteDatanodeUpgrade() throws Exception {
+ Configuration conf = new HdfsConfiguration();
+ conf.setBoolean(
+ DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_BEST_EFFORT_KEY,
+ true);
+ MiniDFSCluster cluster = null;
+ DFSClientFaultInjector old = DFSClientFaultInjector.get();
+ try {
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
+ cluster.waitActive();
+ FileSystem fileSys = cluster.getFileSystem();
+
+ Path file = new Path("/testPipelineRecoveryOnDatanodeUpgrade");
+ DFSTestUtil.createFile(fileSys, file, 10240L, (short) 3, 0L);
+ // treat all restarting nodes as remote for test.
+ DFSClientFaultInjector.set(new DFSClientFaultInjector() {
+ public boolean skipRollingRestartWait() {
+ return true;
+ }
+ });
+
+ final DFSOutputStream out = (DFSOutputStream) fileSys.append(file)
+ .getWrappedStream();
+ final AtomicBoolean running = new AtomicBoolean(true);
+ final AtomicBoolean failed = new AtomicBoolean(false);
+ Thread t = new Thread() {
+ public void run() {
+ while (running.get()) {
+ try {
+ out.write("test".getBytes());
+ out.hflush();
+ // Keep writing data every one second
+ Thread.sleep(1000);
+ } catch (IOException | InterruptedException e) {
+ LOG.error("Exception during write", e);
+ failed.set(true);
+ break;
+ }
+ }
+ running.set(false);
+ }
+ };
+ t.start();
+ // Let write start
+ Thread.sleep(1000);
+ DatanodeInfo[] pipeline = out.getPipeline();
+ for (DatanodeInfo node : pipeline) {
+ assertFalse("Write should be going on", failed.get());
+ ArrayList<DataNode> dataNodes = cluster.getDataNodes();
+ int indexToShutdown = 0;
+ for (int i = 0; i < dataNodes.size(); i++) {
+ if (dataNodes.get(i).getIpcPort() == node.getIpcPort()) {
+ indexToShutdown = i;
+ break;
+ }
+ }
+
+ // Note old genstamp to findout pipeline recovery
+ final long oldGs = out.getBlock().getGenerationStamp();
+ MiniDFSCluster.DataNodeProperties dnProps = cluster
+ .stopDataNodeForUpgrade(indexToShutdown);
+ GenericTestUtils.waitForThreadTermination(
+ "Async datanode shutdown thread", 100, 10000);
+ cluster.restartDataNode(dnProps, true);
+ cluster.waitActive();
+ // wait pipeline to be recovered
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ return out.getBlock().getGenerationStamp() > oldGs;
+ }
+ }, 100, 10000);
+ Assert.assertEquals("The pipeline recovery count shouldn't increase", 0,
+ out.getPipelineRecoveryCount());
+ }
+ assertFalse("Write should be going on", failed.get());
+ running.set(false);
+ t.join();
+ out.write("testagain".getBytes());
+ assertTrue("There should be atleast 2 nodes in pipeline still", out
+ .getPipeline().length >= 2);
+ out.close();
+ } finally {
+ DFSClientFaultInjector.set(old);
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3b86234/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
index 7c40bdd..59da33b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
@@ -918,12 +918,12 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
public synchronized ReplicaHandler createRbw(
StorageType storageType, ExtendedBlock b,
boolean allowLazyPersist) throws IOException {
- return createTemporary(storageType, b);
+ return createTemporary(storageType, b, false);
}
@Override // FsDatasetSpi
public synchronized ReplicaHandler createTemporary(
- StorageType storageType, ExtendedBlock b) throws IOException {
+ StorageType storageType, ExtendedBlock b, boolean isTransfer) throws IOException {
if (isValidBlock(b)) {
throw new ReplicaAlreadyExistsException("Block " + b +
" is valid, and cannot be written to.");
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3b86234/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
index 23072ce..63cdeb1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
@@ -141,7 +141,7 @@ public class ExternalDatasetImpl implements FsDatasetSpi<ExternalVolumeImpl> {
}
@Override
- public ReplicaHandler createTemporary(StorageType t, ExtendedBlock b)
+ public ReplicaHandler createTemporary(StorageType t, ExtendedBlock b, boolean isTransfer)
throws IOException {
return new ReplicaHandler(new ExternalReplicaInPipeline(), null);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3b86234/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java
index 17558f1..648e8a5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java
@@ -457,44 +457,44 @@ public class TestWriteToReplica {
private void testWriteToTemporary(FsDatasetImpl dataSet, ExtendedBlock[] blocks) throws IOException {
try {
- dataSet.createTemporary(StorageType.DEFAULT, blocks[FINALIZED]);
+ dataSet.createTemporary(StorageType.DEFAULT, blocks[FINALIZED], false);
Assert.fail("Should not have created a temporary replica that was " +
"finalized " + blocks[FINALIZED]);
} catch (ReplicaAlreadyExistsException e) {
}
try {
- dataSet.createTemporary(StorageType.DEFAULT, blocks[TEMPORARY]);
+ dataSet.createTemporary(StorageType.DEFAULT, blocks[TEMPORARY], false);
Assert.fail("Should not have created a replica that had created as" +
"temporary " + blocks[TEMPORARY]);
} catch (ReplicaAlreadyExistsException e) {
}
try {
- dataSet.createTemporary(StorageType.DEFAULT, blocks[RBW]);
+ dataSet.createTemporary(StorageType.DEFAULT, blocks[RBW], false);
Assert.fail("Should not have created a replica that had created as RBW " +
blocks[RBW]);
} catch (ReplicaAlreadyExistsException e) {
}
try {
- dataSet.createTemporary(StorageType.DEFAULT, blocks[RWR]);
+ dataSet.createTemporary(StorageType.DEFAULT, blocks[RWR], false);
Assert.fail("Should not have created a replica that was waiting to be " +
"recovered " + blocks[RWR]);
} catch (ReplicaAlreadyExistsException e) {
}
try {
- dataSet.createTemporary(StorageType.DEFAULT, blocks[RUR]);
+ dataSet.createTemporary(StorageType.DEFAULT, blocks[RUR], false);
Assert.fail("Should not have created a replica that was under recovery " +
blocks[RUR]);
} catch (ReplicaAlreadyExistsException e) {
}
- dataSet.createTemporary(StorageType.DEFAULT, blocks[NON_EXISTENT]);
+ dataSet.createTemporary(StorageType.DEFAULT, blocks[NON_EXISTENT], false);
try {
- dataSet.createTemporary(StorageType.DEFAULT, blocks[NON_EXISTENT]);
+ dataSet.createTemporary(StorageType.DEFAULT, blocks[NON_EXISTENT], false);
Assert.fail("Should not have created a replica that had already been "
+ "created " + blocks[NON_EXISTENT]);
} catch (Exception e) {
@@ -506,8 +506,8 @@ public class TestWriteToReplica {
long newGenStamp = blocks[NON_EXISTENT].getGenerationStamp() * 10;
blocks[NON_EXISTENT].setGenerationStamp(newGenStamp);
try {
- ReplicaInPipelineInterface replicaInfo =
- dataSet.createTemporary(StorageType.DEFAULT, blocks[NON_EXISTENT]).getReplica();
+ ReplicaInPipelineInterface replicaInfo = dataSet.createTemporary(
+ StorageType.DEFAULT, blocks[NON_EXISTENT], false).getReplica();
Assert.assertTrue(replicaInfo.getGenerationStamp() == newGenStamp);
Assert.assertTrue(
replicaInfo.getBlockId() == blocks[NON_EXISTENT].getBlockId());
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org