You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ki...@apache.org on 2017/06/01 21:46:48 UTC

hadoop git commit: HDFS-11856. Ability to re-add upgrading nodes to pipeline for future pipeline updates. Contributed by Vinayakumar B.

Repository: hadoop
Updated Branches:
  refs/heads/branch-2.7 01cdea732 -> d3b86234b


HDFS-11856. Ability to re-add upgrading nodes to pipeline for future pipeline updates. Contributed by Vinayakumar B.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d3b86234
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d3b86234
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d3b86234

Branch: refs/heads/branch-2.7
Commit: d3b86234b29031fda2805a805705d336a179a816
Parents: 01cdea7
Author: Kihwal Lee <ki...@apache.org>
Authored: Thu Jun 1 16:45:55 2017 -0500
Committer: Kihwal Lee <ki...@apache.org>
Committed: Thu Jun 1 16:45:55 2017 -0500

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   3 +
 .../hadoop/hdfs/DFSClientFaultInjector.java     |   8 ++
 .../org/apache/hadoop/hdfs/DFSOutputStream.java |  80 +++++++++-----
 .../hdfs/server/datanode/BlockReceiver.java     |   5 +-
 .../server/datanode/fsdataset/FsDatasetSpi.java |   5 +-
 .../impl/FsDatasetAsyncDiskService.java         |  12 +++
 .../datanode/fsdataset/impl/FsDatasetImpl.java  |  91 ++++++++++------
 .../TestClientProtocolForPipelineRecovery.java  | 103 ++++++++++++++++++-
 .../server/datanode/SimulatedFSDataset.java     |   4 +-
 .../extdataset/ExternalDatasetImpl.java         |   2 +-
 .../fsdataset/impl/TestWriteToReplica.java      |  18 ++--
 11 files changed, 260 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3b86234/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index bac3499..2538699 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -340,6 +340,9 @@ Release 2.7.4 - UNRELEASED
 
     HDFS-5042. Completed files lost after power failure. (vinayakumarb via kihwal)
 
+    HDFS-11856. Ability to re-add upgrading nodes to pipeline for future
+    pipeline updates. (vinayakumarb via kihwal)
+
 Release 2.7.3 - 2016-08-25
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3b86234/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java
index 5392c66..974fb22 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java
@@ -33,6 +33,10 @@ public class DFSClientFaultInjector {
   public static DFSClientFaultInjector instance = new DFSClientFaultInjector();
   public static AtomicLong exceptionNum = new AtomicLong(0);
 
+  public static void set(DFSClientFaultInjector dfsClientFaultInjector) {
+    instance = dfsClientFaultInjector;
+  }
+
   public static DFSClientFaultInjector get() {
     return instance;
   }
@@ -54,4 +58,8 @@ public class DFSClientFaultInjector {
   public void fetchFromDatanodeException() {}
 
   public void readFromDatanodeDelay() {}
+
+  public boolean skipRollingRestartWait() {
+    return false;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3b86234/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
index 188502f..1289b30 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
@@ -346,6 +346,7 @@ public class DFSOutputStream extends FSOutputSummer
     volatile int errorIndex = -1;
     // Restarting node index
     AtomicInteger restartingNodeIndex = new AtomicInteger(-1);
+    volatile boolean waitForRestart = true;
     private long restartDeadline = 0; // Deadline of DN restart
     private BlockConstructionStage stage;  // block construction stage
     private long bytesSent = 0; // number of bytes that've been sent
@@ -353,6 +354,8 @@ public class DFSOutputStream extends FSOutputSummer
 
     /** Nodes have been used in the pipeline before and have failed. */
     private final List<DatanodeInfo> failed = new ArrayList<DatanodeInfo>();
+    /** Restarting Nodes */
+    private final List<DatanodeInfo> restartingNodes = new ArrayList<>();
     /** The times have retried to recover pipeline, for the same packet. */
     private volatile int pipelineRecoveryCount = 0;
     /** Has the current block been hflushed? */
@@ -792,6 +795,13 @@ public class DFSOutputStream extends FSOutputSummer
         return true;
       }
 
+      /*
+       * Treat all nodes as remote for test when skip enabled.
+       */
+      if (DFSClientFaultInjector.get().skipRollingRestartWait()) {
+        return false;
+      }
+
       // Is it a local node?
       InetAddress addr = null;
       try {
@@ -852,10 +862,14 @@ public class DFSOutputStream extends FSOutputSummer
                 .getHeaderFlag(i));
               // Restart will not be treated differently unless it is
               // the local node or the only one in the pipeline.
-              if (PipelineAck.isRestartOOBStatus(reply) &&
-                  shouldWaitForRestart(i)) {
-                restartDeadline = dfsClient.getConf().datanodeRestartTimeout
-                    + Time.monotonicNow();
+              if (PipelineAck.isRestartOOBStatus(reply)) {
+                if (shouldWaitForRestart(i)) {
+                  restartDeadline = dfsClient.getConf().datanodeRestartTimeout
+                      + Time.monotonicNow();
+                  waitForRestart = true;
+                } else {
+                  waitForRestart = false;
+                }
                 setRestartingNodeIndex(i);
                 String message = "A datanode is restarting: " + targets[i];
                 DFSClient.LOG.info(message);
@@ -1171,18 +1185,24 @@ public class DFSOutputStream extends FSOutputSummer
         // This process will be repeated until the deadline or the datanode
         // starts back up.
         if (restartingNodeIndex.get() >= 0) {
-          // 4 seconds or the configured deadline period, whichever is shorter.
-          // This is the retry interval and recovery will be retried in this
-          // interval until timeout or success.
-          long delay = Math.min(dfsClient.getConf().datanodeRestartTimeout,
-              4000L);
-          try {
-            Thread.sleep(delay);
-          } catch (InterruptedException ie) {
-            lastException.set(new IOException("Interrupted while waiting for " +
-                "datanode to restart. " + nodes[restartingNodeIndex.get()]));
-            streamerClosed = true;
-            return false;
+          if (!waitForRestart) {
+            setErrorIndex(restartingNodeIndex.get());
+          } else {
+            // 4 seconds or the configured deadline period, whichever is
+            // shorter.
+            // This is the retry interval and recovery will be retried in this
+            // interval until timeout or success.
+            long delay = Math.min(dfsClient.getConf().datanodeRestartTimeout,
+                4000L);
+            try {
+              Thread.sleep(delay);
+            } catch (InterruptedException ie) {
+              lastException.set(new IOException("Interrupted while waiting for "
+                  + "datanode to restart. " + nodes[restartingNodeIndex
+                      .get()]));
+              streamerClosed = true;
+              return false;
+            }
           }
         }
         boolean isRecovery = hasError;
@@ -1204,9 +1224,14 @@ public class DFSOutputStream extends FSOutputSummer
             streamerClosed = true;
             return false;
           }
-          DFSClient.LOG.warn("Error Recovery for block " + block +
-              " in pipeline " + pipelineMsg + 
-              ": bad datanode " + nodes[errorIndex]);
+          String reason = "bad.";
+          if (restartingNodeIndex.get() == errorIndex) {
+            reason = "restarting.";
+            restartingNodes.add(nodes[errorIndex]);
+          }
+          DFSClient.LOG.warn("Error Recovery for block " + block
+              + " in pipeline " + pipelineMsg + ": datanode " + errorIndex + "("
+              + nodes[errorIndex] + ") is " + reason);
           failed.add(nodes[errorIndex]);
 
           DatanodeInfo[] newnodes = new DatanodeInfo[nodes.length-1];
@@ -1229,7 +1254,7 @@ public class DFSOutputStream extends FSOutputSummer
             } else if (errorIndex < restartingNodeIndex.get()) {
               // the node index has shifted.
               restartingNodeIndex.decrementAndGet();
-            } else {
+            } else if (waitForRestart) {
               // this shouldn't happen...
               assert false;
             }
@@ -1458,7 +1483,11 @@ public class DFSOutputStream extends FSOutputSummer
           blockStream = out;
           result =  true; // success
           restartingNodeIndex.set(-1);
+          waitForRestart = true;
           hasError = false;
+          // remove all restarting nodes from failed nodes list
+          failed.removeAll(restartingNodes);
+          restartingNodes.clear();
         } catch (IOException ie) {
           if (restartingNodeIndex.get() == -1) {
             DFSClient.LOG.info("Exception in createBlockOutputStream", ie);
@@ -1489,9 +1518,14 @@ public class DFSOutputStream extends FSOutputSummer
             errorIndex = 0;
           }
           // Check whether there is a restart worth waiting for.
-          if (checkRestart && shouldWaitForRestart(errorIndex)) {
-            restartDeadline = dfsClient.getConf().datanodeRestartTimeout +
-                Time.monotonicNow();
+          if (checkRestart) {
+            if (shouldWaitForRestart(errorIndex)) {
+              restartDeadline = dfsClient.getConf().datanodeRestartTimeout
+                  + Time.monotonicNow();
+              waitForRestart = true;
+            } else {
+              waitForRestart = false;
+            }
             restartingNodeIndex.set(errorIndex);
             errorIndex = -1;
             DFSClient.LOG.info("Waiting for the datanode to be restarted: " +

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3b86234/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
index 45b6ce2..5e5e777 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
@@ -194,7 +194,8 @@ class BlockReceiver implements Closeable {
       // Open local disk out
       //
       if (isDatanode) { //replication or move
-        replicaHandler = datanode.data.createTemporary(storageType, block);
+        replicaHandler = datanode.data.createTemporary(storageType, block,
+            false);
       } else {
         switch (stage) {
         case PIPELINE_SETUP_CREATE:
@@ -223,7 +224,7 @@ class BlockReceiver implements Closeable {
         case TRANSFER_FINALIZED:
           // this is a transfer destination
           replicaHandler =
-              datanode.data.createTemporary(storageType, block);
+              datanode.data.createTemporary(storageType, block, isTransfer);
           break;
         default: throw new IOException("Unsupported stage " + stage + 
               " while receiving block " + block + " from " + inAddr);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3b86234/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
index 8378725..ea9bd02 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
@@ -211,13 +211,14 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
 
   /**
    * Creates a temporary replica and returns the meta information of the replica
-   * 
    * @param b block
+   * @param isTransfer whether for transfer
+   *
    * @return the meta info of the replica which is being written to
    * @throws IOException if an error occurs
    */
   public ReplicaHandler createTemporary(StorageType storageType,
-      ExtendedBlock b) throws IOException;
+      ExtendedBlock b, boolean isTransfer) throws IOException;
 
   /**
    * Creates a RBW replica and returns the meta info of the replica

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3b86234/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java
index c1d3990..e6e5824 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java
@@ -220,6 +220,18 @@ class FsDatasetAsyncDiskService {
         volumeRef, blockFile, metaFile, block, trashDirectory);
     execute(((FsVolumeImpl) volumeRef.getVolume()).getCurrentDir(), deletionTask);
   }
+
+  /**
+   * Delete the block file and meta file from the disk synchronously, adjust
+   * dfsUsed statistics accordingly.
+   */
+  void deleteSync(FsVolumeReference volumeRef, File blockFile, File metaFile,
+      ExtendedBlock block, String trashDirectory) {
+    LOG.info("Deleting " + block.getLocalBlock() + " file " + blockFile);
+    ReplicaFileDeleteTask deletionTask = new ReplicaFileDeleteTask(
+        volumeRef, blockFile, metaFile, block, trashDirectory);
+    deletionTask.run();
+  }
   
   /** A task for deleting a block file and its associated meta file, as well
    *  as decrement the dfs usage of the volume.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3b86234/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index 76867b1..1886590 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -1466,38 +1466,28 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   }
 
   @Override // FsDatasetSpi
-  public ReplicaHandler createTemporary(
-      StorageType storageType, ExtendedBlock b) throws IOException {
+  public ReplicaHandler createTemporary(StorageType storageType,
+      ExtendedBlock b, boolean isTransfer) throws IOException {
     long startTimeMs = Time.monotonicNow();
     long writerStopTimeoutMs = datanode.getDnConf().getXceiverStopTimeout();
     ReplicaInfo lastFoundReplicaInfo = null;
+    boolean isInPipeline = false;
     do {
       synchronized (this) {
         ReplicaInfo currentReplicaInfo =
             volumeMap.get(b.getBlockPoolId(), b.getBlockId());
         if (currentReplicaInfo == lastFoundReplicaInfo) {
-          if (lastFoundReplicaInfo != null) {
-            invalidate(b.getBlockPoolId(), new Block[] { lastFoundReplicaInfo });
-          }
-          FsVolumeReference ref =
-              volumes.getNextVolume(storageType, b.getNumBytes());
-          FsVolumeImpl v = (FsVolumeImpl) ref.getVolume();
-          // create a temporary file to hold block in the designated volume
-          File f;
-          try {
-            f = v.createTmpFile(b.getBlockPoolId(), b.getLocalBlock());
-          } catch (IOException e) {
-            IOUtils.cleanup(null, ref);
-            throw e;
-          }
-          ReplicaInPipeline newReplicaInfo =
-              new ReplicaInPipeline(b.getBlockId(), b.getGenerationStamp(), v,
-                  f.getParentFile(), 0);
-          volumeMap.add(b.getBlockPoolId(), newReplicaInfo);
-          return new ReplicaHandler(newReplicaInfo, ref);
+          break;
         } else {
-          if (!(currentReplicaInfo.getGenerationStamp() < b
-              .getGenerationStamp() && currentReplicaInfo instanceof ReplicaInPipeline)) {
+          isInPipeline = currentReplicaInfo.getState() == ReplicaState.TEMPORARY
+              || currentReplicaInfo.getState() == ReplicaState.RBW;
+          /*
+           * If the current block is old, reject.
+           * else If transfer request, then accept it.
+           * else if state is not RBW/Temporary, then reject
+           */
+          if ((currentReplicaInfo.getGenerationStamp() >= b.getGenerationStamp())
+              || (!isTransfer && !isInPipeline)) {
             throw new ReplicaAlreadyExistsException("Block " + b
                 + " already exists in state " + currentReplicaInfo.getState()
                 + " and thus cannot be created.");
@@ -1506,6 +1496,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
         }
       }
 
+      if (!isInPipeline) {
+        continue;
+      }
       // Hang too long, just bail out. This is not supposed to happen.
       long writerStopMs = Time.monotonicNow() - startTimeMs;
       if (writerStopMs > writerStopTimeoutMs) {
@@ -1519,6 +1512,31 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       ((ReplicaInPipeline) lastFoundReplicaInfo)
           .stopWriter(writerStopTimeoutMs);
     } while (true);
+
+    if (lastFoundReplicaInfo != null) {
+      // Old blockfile should be deleted synchronously as it might collide
+      // with the new block if allocated in same volume.
+      // Do the deletion outside of lock as its DISK IO.
+      invalidate(b.getBlockPoolId(), new Block[] { lastFoundReplicaInfo },
+          false);
+    }
+    synchronized (this) {
+      FsVolumeReference ref = volumes.getNextVolume(storageType, b
+          .getNumBytes());
+      FsVolumeImpl v = (FsVolumeImpl) ref.getVolume();
+      // create a temporary file to hold block in the designated volume
+      File f;
+      try {
+        f = v.createTmpFile(b.getBlockPoolId(), b.getLocalBlock());
+      } catch (IOException e) {
+        IOUtils.cleanup(null, ref);
+        throw e;
+      }
+      ReplicaInPipeline newReplicaInfo = new ReplicaInPipeline(b.getBlockId(), b
+          .getGenerationStamp(), v, f.getParentFile(), 0);
+      volumeMap.add(b.getBlockPoolId(), newReplicaInfo);
+      return new ReplicaHandler(newReplicaInfo, ref);
+    }
   }
 
   /**
@@ -1849,6 +1867,11 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
    */
   @Override // FsDatasetSpi
   public void invalidate(String bpid, Block invalidBlks[]) throws IOException {
+    invalidate(bpid, invalidBlks, true);
+  }
+
+  private void invalidate(String bpid, Block[] invalidBlks, boolean async)
+      throws IOException {
     final List<String> errors = new ArrayList<String>();
     for (int i = 0; i < invalidBlks.length; i++) {
       final File f;
@@ -1910,14 +1933,22 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       // If the block is cached, start uncaching it.
       cacheManager.uncacheBlock(bpid, invalidBlks[i].getBlockId());
 
-      // Delete the block asynchronously to make sure we can do it fast enough.
-      // It's ok to unlink the block file before the uncache operation
-      // finishes.
       try {
-        asyncDiskService.deleteAsync(v.obtainReference(), f,
-            FsDatasetUtil.getMetaFile(f, invalidBlks[i].getGenerationStamp()),
-            new ExtendedBlock(bpid, invalidBlks[i]),
-            dataStorage.getTrashDirectoryForBlockFile(bpid, f));
+        // Delete the block asynchronously to make sure we can do it fast
+        // enough.
+        // It's ok to unlink the block file before the uncache operation
+        // finishes.
+        if (async) {
+          asyncDiskService.deleteAsync(v.obtainReference(), f,
+              FsDatasetUtil.getMetaFile(f, invalidBlks[i].getGenerationStamp()),
+              new ExtendedBlock(bpid, invalidBlks[i]),
+              dataStorage.getTrashDirectoryForBlockFile(bpid, f));
+        } else {
+          asyncDiskService.deleteSync(v.obtainReference(), f,
+              FsDatasetUtil.getMetaFile(f, invalidBlks[i].getGenerationStamp()),
+              new ExtendedBlock(bpid, invalidBlks[i]),
+              dataStorage.getTrashDirectoryForBlockFile(bpid, f));
+        }
       } catch (ClosedChannelException e) {
         LOG.warn("Volume " + v + " is closed, ignore the deletion task for " +
             "block " + invalidBlks[i]);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3b86234/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java
index 47fce0e..63a6f62 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java
@@ -17,7 +17,12 @@
  */
 package org.apache.hadoop.hdfs;
 
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import com.google.common.base.Supplier;
 import org.apache.hadoop.conf.Configuration;
@@ -39,13 +44,16 @@ import org.junit.Assert;
 import org.junit.Test;
 
 import org.mockito.Mockito;
-import org.mockito.stubbing.Answer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * This tests pipeline recovery related client protocol works correct or not.
  */
 public class TestClientProtocolForPipelineRecovery {
-  
+  private static final Logger LOG = LoggerFactory.getLogger(
+      TestClientProtocolForPipelineRecovery.class);
+
   @Test public void testGetNewStamp() throws IOException {
     int numDataNodes = 1;
     Configuration conf = new HdfsConfiguration();
@@ -428,4 +436,95 @@ public class TestClientProtocolForPipelineRecovery {
       DataNodeFaultInjector.set(oldDnInjector);
     }
   }
+
+  @Test
+  public void testPipelineRecoveryOnRemoteDatanodeUpgrade() throws Exception {
+    Configuration conf = new HdfsConfiguration();
+    conf.setBoolean(
+        DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_BEST_EFFORT_KEY,
+        true);
+    MiniDFSCluster cluster = null;
+    DFSClientFaultInjector old = DFSClientFaultInjector.get();
+    try {
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
+      cluster.waitActive();
+      FileSystem fileSys = cluster.getFileSystem();
+
+      Path file = new Path("/testPipelineRecoveryOnDatanodeUpgrade");
+      DFSTestUtil.createFile(fileSys, file, 10240L, (short) 3, 0L);
+      // treat all restarting nodes as remote for test.
+      DFSClientFaultInjector.set(new DFSClientFaultInjector() {
+        public boolean skipRollingRestartWait() {
+          return true;
+        }
+      });
+
+      final DFSOutputStream out = (DFSOutputStream) fileSys.append(file)
+          .getWrappedStream();
+      final AtomicBoolean running = new AtomicBoolean(true);
+      final AtomicBoolean failed = new AtomicBoolean(false);
+      Thread t = new Thread() {
+        public void run() {
+          while (running.get()) {
+            try {
+              out.write("test".getBytes());
+              out.hflush();
+              // Keep writing data every one second
+              Thread.sleep(1000);
+            } catch (IOException | InterruptedException e) {
+              LOG.error("Exception during write", e);
+              failed.set(true);
+              break;
+            }
+          }
+          running.set(false);
+        }
+      };
+      t.start();
+      // Let write start
+      Thread.sleep(1000);
+      DatanodeInfo[] pipeline = out.getPipeline();
+      for (DatanodeInfo node : pipeline) {
+        assertFalse("Write should be going on", failed.get());
+        ArrayList<DataNode> dataNodes = cluster.getDataNodes();
+        int indexToShutdown = 0;
+        for (int i = 0; i < dataNodes.size(); i++) {
+          if (dataNodes.get(i).getIpcPort() == node.getIpcPort()) {
+            indexToShutdown = i;
+            break;
+          }
+        }
+
+        // Note old genstamp to findout pipeline recovery
+        final long oldGs = out.getBlock().getGenerationStamp();
+        MiniDFSCluster.DataNodeProperties dnProps = cluster
+            .stopDataNodeForUpgrade(indexToShutdown);
+        GenericTestUtils.waitForThreadTermination(
+            "Async datanode shutdown thread", 100, 10000);
+        cluster.restartDataNode(dnProps, true);
+        cluster.waitActive();
+        // wait pipeline to be recovered
+        GenericTestUtils.waitFor(new Supplier<Boolean>() {
+          @Override
+          public Boolean get() {
+            return out.getBlock().getGenerationStamp() > oldGs;
+          }
+        }, 100, 10000);
+        Assert.assertEquals("The pipeline recovery count shouldn't increase", 0,
+            out.getPipelineRecoveryCount());
+      }
+      assertFalse("Write should be going on", failed.get());
+      running.set(false);
+      t.join();
+      out.write("testagain".getBytes());
+      assertTrue("There should be atleast 2 nodes in pipeline still", out
+          .getPipeline().length >= 2);
+      out.close();
+    } finally {
+      DFSClientFaultInjector.set(old);
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3b86234/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
index 7c40bdd..59da33b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
@@ -918,12 +918,12 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
   public synchronized ReplicaHandler createRbw(
       StorageType storageType, ExtendedBlock b,
       boolean allowLazyPersist) throws IOException {
-    return createTemporary(storageType, b);
+    return createTemporary(storageType, b, false);
   }
 
   @Override // FsDatasetSpi
   public synchronized ReplicaHandler createTemporary(
-      StorageType storageType, ExtendedBlock b) throws IOException {
+      StorageType storageType, ExtendedBlock b, boolean isTransfer) throws IOException {
     if (isValidBlock(b)) {
           throw new ReplicaAlreadyExistsException("Block " + b + 
               " is valid, and cannot be written to.");

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3b86234/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
index 23072ce..63cdeb1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
@@ -141,7 +141,7 @@ public class ExternalDatasetImpl implements FsDatasetSpi<ExternalVolumeImpl> {
   }
 
   @Override
-  public ReplicaHandler createTemporary(StorageType t, ExtendedBlock b)
+  public ReplicaHandler createTemporary(StorageType t, ExtendedBlock b, boolean isTransfer)
       throws IOException {
     return new ReplicaHandler(new ExternalReplicaInPipeline(), null);
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3b86234/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java
index 17558f1..648e8a5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java
@@ -457,44 +457,44 @@ public class TestWriteToReplica {
   
   private void testWriteToTemporary(FsDatasetImpl dataSet, ExtendedBlock[] blocks) throws IOException {
     try {
-      dataSet.createTemporary(StorageType.DEFAULT, blocks[FINALIZED]);
+      dataSet.createTemporary(StorageType.DEFAULT, blocks[FINALIZED], false);
       Assert.fail("Should not have created a temporary replica that was " +
       		"finalized " + blocks[FINALIZED]);
     } catch (ReplicaAlreadyExistsException e) {
     }
  
     try {
-      dataSet.createTemporary(StorageType.DEFAULT, blocks[TEMPORARY]);
+      dataSet.createTemporary(StorageType.DEFAULT, blocks[TEMPORARY], false);
       Assert.fail("Should not have created a replica that had created as" +
       		"temporary " + blocks[TEMPORARY]);
     } catch (ReplicaAlreadyExistsException e) {
     }
     
     try {
-      dataSet.createTemporary(StorageType.DEFAULT, blocks[RBW]);
+      dataSet.createTemporary(StorageType.DEFAULT, blocks[RBW], false);
       Assert.fail("Should not have created a replica that had created as RBW " +
           blocks[RBW]);
     } catch (ReplicaAlreadyExistsException e) {
     }
     
     try {
-      dataSet.createTemporary(StorageType.DEFAULT, blocks[RWR]);
+      dataSet.createTemporary(StorageType.DEFAULT, blocks[RWR], false);
       Assert.fail("Should not have created a replica that was waiting to be " +
       		"recovered " + blocks[RWR]);
     } catch (ReplicaAlreadyExistsException e) {
     }
     
     try {
-      dataSet.createTemporary(StorageType.DEFAULT, blocks[RUR]);
+      dataSet.createTemporary(StorageType.DEFAULT, blocks[RUR], false);
       Assert.fail("Should not have created a replica that was under recovery " +
           blocks[RUR]);
     } catch (ReplicaAlreadyExistsException e) {
     }
     
-    dataSet.createTemporary(StorageType.DEFAULT, blocks[NON_EXISTENT]);
+    dataSet.createTemporary(StorageType.DEFAULT, blocks[NON_EXISTENT], false);
 
     try {
-      dataSet.createTemporary(StorageType.DEFAULT, blocks[NON_EXISTENT]);
+      dataSet.createTemporary(StorageType.DEFAULT, blocks[NON_EXISTENT], false);
       Assert.fail("Should not have created a replica that had already been "
           + "created " + blocks[NON_EXISTENT]);
     } catch (Exception e) {
@@ -506,8 +506,8 @@ public class TestWriteToReplica {
     long newGenStamp = blocks[NON_EXISTENT].getGenerationStamp() * 10;
     blocks[NON_EXISTENT].setGenerationStamp(newGenStamp);
     try {
-      ReplicaInPipelineInterface replicaInfo =
-          dataSet.createTemporary(StorageType.DEFAULT, blocks[NON_EXISTENT]).getReplica();
+      ReplicaInPipelineInterface replicaInfo = dataSet.createTemporary(
+          StorageType.DEFAULT, blocks[NON_EXISTENT], false).getReplica();
       Assert.assertTrue(replicaInfo.getGenerationStamp() == newGenStamp);
       Assert.assertTrue(
           replicaInfo.getBlockId() == blocks[NON_EXISTENT].getBlockId());


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org