You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by xg...@apache.org on 2016/12/14 21:59:54 UTC

[2/9] hadoop git commit: HDFS-11164: Mover should avoid unnecessary retries if the block is pinned. Contributed by Rakesh R

HDFS-11164: Mover should avoid unnecessary retries if the block is pinned. Contributed by Rakesh R


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e24a923d
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e24a923d
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e24a923d

Branch: refs/heads/YARN-5734
Commit: e24a923db50879f7dbe5d2afac0e6757089fb07d
Parents: 9947aeb
Author: Uma Maheswara Rao G <um...@intel.com>
Authored: Tue Dec 13 17:09:58 2016 -0800
Committer: Uma Maheswara Rao G <um...@intel.com>
Committed: Tue Dec 13 17:09:58 2016 -0800

----------------------------------------------------------------------
 .../datatransfer/BlockPinningException.java     |  33 ++++
 .../datatransfer/DataTransferProtoUtil.java     |  17 +-
 .../src/main/proto/datatransfer.proto           |   1 +
 .../hadoop/hdfs/server/balancer/Dispatcher.java |  62 ++++++-
 .../hdfs/server/datanode/DataXceiver.java       |   8 +-
 .../apache/hadoop/hdfs/server/mover/Mover.java  |  26 ++-
 .../hdfs/server/datanode/DataNodeTestUtils.java |  28 ++++
 .../server/datanode/TestBlockReplacement.java   |  70 +++++++-
 .../hadoop/hdfs/server/mover/TestMover.java     | 163 ++++++++++++++++++-
 9 files changed, 395 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/e24a923d/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/BlockPinningException.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/BlockPinningException.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/BlockPinningException.java
new file mode 100644
index 0000000..c2f12f9
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/BlockPinningException.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol.datatransfer;
+
+import java.io.IOException;
+
+/**
+ * Indicates a failure due to block pinning.
+ */
+public class BlockPinningException extends IOException {
+
+  // Required by {@link java.io.Serializable}.
+  private static final long serialVersionUID = 1L;
+
+  public BlockPinningException(String errMsg) {
+    super(errMsg);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e24a923d/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java
index 6801149..287928c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java
@@ -24,11 +24,11 @@ import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BaseHeaderProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferTraceInfoProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ChecksumTypeProto;
 import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
@@ -107,6 +107,11 @@ public abstract class DataTransferProtoUtil {
   public static void checkBlockOpStatus(
           BlockOpResponseProto response,
           String logInfo) throws IOException {
+    checkBlockOpStatus(response, logInfo, false);
+  }
+
+  public static void checkBlockOpStatus(BlockOpResponseProto response,
+      String logInfo, boolean checkBlockPinningErr) throws IOException {
     if (response.getStatus() != Status.SUCCESS) {
       if (response.getStatus() == Status.ERROR_ACCESS_TOKEN) {
         throw new InvalidBlockTokenException(
@@ -114,6 +119,14 @@ public abstract class DataTransferProtoUtil {
           + ", status message " + response.getMessage()
           + ", " + logInfo
         );
+      } else if (checkBlockPinningErr
+          && response.getStatus() == Status.ERROR_BLOCK_PINNED) {
+        throw new BlockPinningException(
+            "Got error"
+            + ", status=" + response.getStatus().name()
+            + ", status message " + response.getMessage()
+            + ", " + logInfo
+          );
       } else {
         throw new IOException(
           "Got error"

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e24a923d/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto
index 290b158..889361a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto
@@ -243,6 +243,7 @@ enum Status {
   OOB_RESERVED2 = 10;         // Reserved
   OOB_RESERVED3 = 11;         // Reserved
   IN_PROGRESS = 12;
+  ERROR_BLOCK_PINNED = 13;
 }
 
 enum ShortCircuitFdResponse {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e24a923d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
index eb3ed87..0e62da2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
@@ -36,6 +36,7 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -55,6 +56,7 @@ import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.datatransfer.BlockPinningException;
 import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
 import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
 import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
@@ -224,6 +226,10 @@ public class Dispatcher {
       this.target = target;
     }
 
+    public DatanodeInfo getSource() {
+      return source.getDatanodeInfo();
+    }
+
     @Override
     public String toString() {
       final Block b = reportedBlock != null ? reportedBlock.getBlock() : null;
@@ -367,6 +373,15 @@ public class Dispatcher {
       } catch (IOException e) {
         LOG.warn("Failed to move " + this, e);
         target.getDDatanode().setHasFailure();
+        // Check that the failure is due to block pinning errors.
+        if (e instanceof BlockPinningException) {
+          // Pinned block can't be moved. Add this block into failure list.
+          // Later in the next iteration mover will exclude these blocks from
+          // pending moves.
+          target.getDDatanode().addBlockPinningFailures(this);
+          return;
+        }
+
         // Proxy or target may have some issues, delay before using these nodes
         // further in order to avoid a potential storm of "threads quota
         // exceeded" warnings when the dispatcher gets out of sync with work
@@ -419,7 +434,7 @@ public class Dispatcher {
         }
       }
       String logInfo = "reportedBlock move is failed";
-      DataTransferProtoUtil.checkBlockOpStatus(response, logInfo);
+      DataTransferProtoUtil.checkBlockOpStatus(response, logInfo, true);
     }
 
     /** reset the object */
@@ -600,6 +615,7 @@ public class Dispatcher {
     /** blocks being moved but not confirmed yet */
     private final List<PendingMove> pendings;
     private volatile boolean hasFailure = false;
+    private Map<Long, Set<DatanodeInfo>> blockPinningFailures = new HashMap<>();
     private volatile boolean hasSuccess = false;
     private ExecutorService moveExecutor;
 
@@ -685,6 +701,22 @@ public class Dispatcher {
       this.hasFailure = true;
     }
 
+    void addBlockPinningFailures(PendingMove pendingBlock) {
+      synchronized (blockPinningFailures) {
+        long blockId = pendingBlock.reportedBlock.getBlock().getBlockId();
+        Set<DatanodeInfo> pinnedLocations = blockPinningFailures.get(blockId);
+        if (pinnedLocations == null) {
+          pinnedLocations = new HashSet<>();
+          blockPinningFailures.put(blockId, pinnedLocations);
+        }
+        pinnedLocations.add(pendingBlock.getSource());
+      }
+    }
+
+    Map<Long, Set<DatanodeInfo>> getBlockPinningFailureList() {
+      return blockPinningFailures;
+    }
+
     void setHasSuccess() {
       this.hasSuccess = true;
     }
@@ -1155,6 +1187,34 @@ public class Dispatcher {
   }
 
   /**
+   * Check any of the block movements are failed due to block pinning errors. If
+   * yes, add the failed blockId and its respective source node location to the
+   * excluded list.
+   */
+  public static void checkForBlockPinningFailures(
+      Map<Long, Set<DatanodeInfo>> excludedPinnedBlocks,
+      Iterable<? extends StorageGroup> targets) {
+    for (StorageGroup t : targets) {
+      Map<Long, Set<DatanodeInfo>> blockPinningFailureList = t.getDDatanode()
+          .getBlockPinningFailureList();
+      Set<Entry<Long, Set<DatanodeInfo>>> entrySet = blockPinningFailureList
+          .entrySet();
+      for (Entry<Long, Set<DatanodeInfo>> entry : entrySet) {
+        Long blockId = entry.getKey();
+        Set<DatanodeInfo> locs = excludedPinnedBlocks.get(blockId);
+        if (locs == null) {
+          // blockId doesn't exists in the excluded list.
+          locs = entry.getValue();
+          excludedPinnedBlocks.put(blockId, locs);
+        } else {
+          // blockId already exists in the excluded list, add the pinned node.
+          locs.addAll(entry.getValue());
+        }
+      }
+    }
+  }
+
+  /**
    * @return true if some moves are success.
    */
   public static boolean checkForSuccess(

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e24a923d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
index fee16b3..a35a5b4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.StripedBlockInfo;
 import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
+import org.apache.hadoop.hdfs.protocol.datatransfer.BlockPinningException;
 import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
 import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
 import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
@@ -1022,7 +1023,7 @@ class DataXceiver extends Receiver implements Runnable {
       String msg = "Not able to copy block " + block.getBlockId() + " " +
           "to " + peer.getRemoteAddressString() + " because it's pinned ";
       LOG.info(msg);
-      sendResponse(ERROR, msg);
+      sendResponse(Status.ERROR_BLOCK_PINNED, msg);
       return;
     }
     
@@ -1156,7 +1157,7 @@ class DataXceiver extends Receiver implements Runnable {
 
         String logInfo = "copy block " + block + " from "
             + proxySock.getRemoteSocketAddress();
-        DataTransferProtoUtil.checkBlockOpStatus(copyResponse, logInfo);
+        DataTransferProtoUtil.checkBlockOpStatus(copyResponse, logInfo, true);
 
         // get checksum info about the block we're copying
         ReadOpChecksumInfoProto checksumInfo = copyResponse.getReadOpChecksumInfo();
@@ -1183,6 +1184,9 @@ class DataXceiver extends Receiver implements Runnable {
       }
     } catch (IOException ioe) {
       opStatus = ERROR;
+      if (ioe instanceof BlockPinningException) {
+        opStatus = Status.ERROR_BLOCK_PINNED;
+      }
       errMsg = "opReplaceBlock " + block + " received exception " + ioe; 
       LOG.info(errMsg);
       if (!IoeDuringCopyBlockOperation) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e24a923d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
index 4ab55d3..bc75f0f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
@@ -46,7 +46,6 @@ import org.apache.hadoop.hdfs.server.namenode.INode;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
 import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.security.SecurityUtil;
@@ -117,10 +116,12 @@ public class Mover {
   private final List<Path> targetPaths;
   private final int retryMaxAttempts;
   private final AtomicInteger retryCount;
+  private final Map<Long, Set<DatanodeInfo>> excludedPinnedBlocks;
 
   private final BlockStoragePolicy[] blockStoragePolicies;
 
-  Mover(NameNodeConnector nnc, Configuration conf, AtomicInteger retryCount) {
+  Mover(NameNodeConnector nnc, Configuration conf, AtomicInteger retryCount,
+      Map<Long, Set<DatanodeInfo>> excludedPinnedBlocks) {
     final long movedWinWidth = conf.getLong(
         DFSConfigKeys.DFS_MOVER_MOVEDWINWIDTH_KEY,
         DFSConfigKeys.DFS_MOVER_MOVEDWINWIDTH_DEFAULT);
@@ -144,6 +145,7 @@ public class Mover {
     this.targetPaths = nnc.getTargetPaths();
     this.blockStoragePolicies = new BlockStoragePolicy[1 <<
         BlockStoragePolicySuite.ID_BIT_LENGTH];
+    this.excludedPinnedBlocks = excludedPinnedBlocks;
   }
 
   void init() throws IOException {
@@ -292,6 +294,8 @@ public class Mover {
       // wait for pending move to finish and retry the failed migration
       boolean hasFailed = Dispatcher.waitForMoveCompletion(storages.targets
           .values());
+      Dispatcher.checkForBlockPinningFailures(excludedPinnedBlocks,
+          storages.targets.values());
       boolean hasSuccess = Dispatcher.checkForSuccess(storages.targets
           .values());
       if (hasFailed && !hasSuccess) {
@@ -461,6 +465,19 @@ public class Mover {
         return true;
       }
 
+      // Check the given block is pinned in the source datanode. A pinned block
+      // can't be moved to a different datanode. So we can skip adding these
+      // blocks to different nodes.
+      long blockId = db.getBlock().getBlockId();
+      if (excludedPinnedBlocks.containsKey(blockId)) {
+        Set<DatanodeInfo> locs = excludedPinnedBlocks.get(blockId);
+        for (DatanodeInfo dn : locs) {
+          if (source.getDatanodeInfo().equals(dn)) {
+            return false;
+          }
+        }
+      }
+
       if (dispatcher.getCluster().isNodeGroupAware()) {
         if (chooseTarget(db, source, targetTypes, Matcher.SAME_NODE_GROUP)) {
           return true;
@@ -614,6 +631,8 @@ public class Mover {
             DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_DEFAULT,
             TimeUnit.SECONDS) * 1000;
     AtomicInteger retryCount = new AtomicInteger(0);
+    // TODO: Need to limit the size of the pinned blocks to limit memory usage
+    Map<Long, Set<DatanodeInfo>> excludedPinnedBlocks = new HashMap<>();
     LOG.info("namenodes = " + namenodes);
 
     checkKeytabAndInit(conf);
@@ -628,7 +647,8 @@ public class Mover {
         Iterator<NameNodeConnector> iter = connectors.iterator();
         while (iter.hasNext()) {
           NameNodeConnector nnc = iter.next();
-          final Mover m = new Mover(nnc, conf, retryCount);
+          final Mover m = new Mover(nnc, conf, retryCount,
+              excludedPinnedBlocks);
           final ExitStatus r = m.run();
 
           if (r == ExitStatus.SUCCESS) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e24a923d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java
index e2755f9..3501ed3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java
@@ -25,10 +25,17 @@ import java.io.IOException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
 
 /**
  * Utility class for accessing package-private DataNode information during tests.
@@ -175,4 +182,25 @@ public class DataNodeTestUtils {
       dn.getDirectoryScanner().reconcile();
     }
   }
+
+  /**
+   * This method is used to mock the data node block pinning API.
+   *
+   * @param dn datanode
+   * @param pinned true if the block is pinned, false otherwise
+   * @throws IOException
+   */
+  public static void mockDatanodeBlkPinning(final DataNode dn,
+      final boolean pinned) throws IOException {
+    final FsDatasetSpi<? extends FsVolumeSpi> data = dn.data;
+    dn.data = Mockito.spy(data);
+
+    doAnswer(new Answer<Object>() {
+      public Object answer(InvocationOnMock invocation) throws IOException {
+        // Bypass the argument to FsDatasetImpl#getPinning to show that
+        // the block is pinned.
+        return pinned;
+      }
+    }).when(dn.data).getPinning(any(ExtendedBlock.class));
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e24a923d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java
index 597dc46..f811bd8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java
@@ -208,6 +208,67 @@ public class TestBlockReplacement {
     }
   }
 
+  /**
+   * Test to verify that the copying of pinned block to a different destination
+   * datanode will throw IOException with error code Status.ERROR_BLOCK_PINNED.
+   *
+   */
+  @Test(timeout = 90000)
+  public void testBlockReplacementWithPinnedBlocks() throws Exception {
+    final Configuration conf = new HdfsConfiguration();
+
+    // create only one datanode in the cluster with DISK and ARCHIVE storage
+    // types.
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3)
+        .storageTypes(
+            new StorageType[] {StorageType.DISK, StorageType.ARCHIVE})
+        .build();
+
+    try {
+      cluster.waitActive();
+
+      final DistributedFileSystem dfs = cluster.getFileSystem();
+      String fileName = "/testBlockReplacementWithPinnedBlocks/file";
+      final Path file = new Path(fileName);
+      DFSTestUtil.createFile(dfs, file, 1024, (short) 1, 1024);
+
+      LocatedBlock lb = dfs.getClient().getLocatedBlocks(fileName, 0).get(0);
+      DatanodeInfo[] oldNodes = lb.getLocations();
+      assertEquals("Wrong block locations", oldNodes.length, 1);
+      DatanodeInfo source = oldNodes[0];
+      ExtendedBlock b = lb.getBlock();
+
+      DatanodeInfo[] datanodes = dfs.getDataNodeStats();
+      DatanodeInfo destin = null;
+      for (DatanodeInfo datanodeInfo : datanodes) {
+        // choose different destination node
+        if (!oldNodes[0].equals(datanodeInfo)) {
+          destin = datanodeInfo;
+          break;
+        }
+      }
+
+      assertNotNull("Failed to choose destination datanode!", destin);
+
+      assertFalse("Source and destin datanode should be different",
+          source.equals(destin));
+
+      // Mock FsDatasetSpi#getPinning to show that the block is pinned.
+      for (int i = 0; i < cluster.getDataNodes().size(); i++) {
+        DataNode dn = cluster.getDataNodes().get(i);
+        LOG.info("Simulate block pinning in datanode " + dn);
+        DataNodeTestUtils.mockDatanodeBlkPinning(dn, true);
+      }
+
+      // Block movement to a different datanode should fail as the block is
+      // pinned.
+      assertTrue("Status code mismatches!", replaceBlock(b, source, source,
+          destin, StorageType.ARCHIVE, Status.ERROR_BLOCK_PINNED));
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
   @Test
   public void testBlockMoveAcrossStorageInSameNode() throws Exception {
     final Configuration conf = new HdfsConfiguration();
@@ -236,7 +297,7 @@ public class TestBlockReplacement {
       // move block to ARCHIVE by using same DataNodeInfo for source, proxy and
       // destination so that movement happens within datanode 
       assertTrue(replaceBlock(block, source, source, source,
-          StorageType.ARCHIVE));
+          StorageType.ARCHIVE, Status.SUCCESS));
       
       // wait till namenode notified
       Thread.sleep(3000);
@@ -311,7 +372,7 @@ public class TestBlockReplacement {
   private boolean replaceBlock( ExtendedBlock block, DatanodeInfo source,
       DatanodeInfo sourceProxy, DatanodeInfo destination) throws IOException {
     return replaceBlock(block, source, sourceProxy, destination,
-        StorageType.DEFAULT);
+        StorageType.DEFAULT, Status.SUCCESS);
   }
 
   /*
@@ -322,7 +383,8 @@ public class TestBlockReplacement {
       DatanodeInfo source,
       DatanodeInfo sourceProxy,
       DatanodeInfo destination,
-      StorageType targetStorageType) throws IOException, SocketException {
+      StorageType targetStorageType,
+      Status opStatus) throws IOException, SocketException {
     Socket sock = new Socket();
     try {
       sock.connect(NetUtils.createSocketAddr(destination.getXferAddr()),
@@ -342,7 +404,7 @@ public class TestBlockReplacement {
       while (proto.getStatus() == Status.IN_PROGRESS) {
         proto = BlockOpResponseProto.parseDelimitedFrom(reply);
       }
-      return proto.getStatus() == Status.SUCCESS;
+      return proto.getStatus() == opStatus;
     } finally {
       sock.close();
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e24a923d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java
index 20a6959..d565548 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java
@@ -37,11 +37,13 @@ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSF
 
 import java.io.File;
 import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.net.URI;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -64,6 +66,7 @@ import org.apache.hadoop.hdfs.MiniDFSNNTopology;
 import org.apache.hadoop.hdfs.NameNodeProxies;
 import org.apache.hadoop.hdfs.StripedFileTestUtil;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@@ -72,6 +75,8 @@ import org.apache.hadoop.hdfs.server.balancer.Dispatcher.DBlock;
 import org.apache.hadoop.hdfs.server.balancer.ExitStatus;
 import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector;
 import org.apache.hadoop.hdfs.server.balancer.TestBalancer;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
 import org.apache.hadoop.hdfs.server.mover.Mover.MLocation;
 import org.apache.hadoop.hdfs.server.namenode.ErasureCodingPolicyManager;
 import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
@@ -121,7 +126,7 @@ public class TestMover {
     final List<NameNodeConnector> nncs = NameNodeConnector.newNameNodeConnectors(
         nnMap, Mover.class.getSimpleName(), Mover.MOVER_ID_PATH, conf,
         NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS);
-    return new Mover(nncs.get(0), conf, new AtomicInteger(0));
+    return new Mover(nncs.get(0), conf, new AtomicInteger(0), new HashMap<>());
   }
 
   @Test
@@ -705,4 +710,160 @@ public class TestMover {
       UserGroupInformation.setConfiguration(new Configuration());
     }
   }
+
+  /**
+   * Test to verify that mover can't move pinned blocks.
+   */
+  @Test(timeout = 90000)
+  public void testMoverWithPinnedBlocks() throws Exception {
+    final Configuration conf = new HdfsConfiguration();
+    initConf(conf);
+
+    // Sets bigger retry max attempts value so that test case will timed out if
+    // block pinning errors are not handled properly during block movement.
+    conf.setInt(DFSConfigKeys.DFS_MOVER_RETRY_MAX_ATTEMPTS_KEY, 10000);
+
+    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(3)
+        .build();
+    try {
+      cluster.waitActive();
+      final DistributedFileSystem dfs = cluster.getFileSystem();
+      final String file = "/testMoverWithPinnedBlocks/file";
+      Path dir = new Path("/testMoverWithPinnedBlocks");
+      dfs.mkdirs(dir);
+
+      // write to DISK
+      dfs.setStoragePolicy(dir, "HOT");
+      final FSDataOutputStream out = dfs.create(new Path(file));
+      byte[] fileData = StripedFileTestUtil
+          .generateBytes(DEFAULT_BLOCK_SIZE * 3);
+      out.write(fileData);
+      out.close();
+
+      // verify before movement
+      LocatedBlock lb = dfs.getClient().getLocatedBlocks(file, 0).get(0);
+      StorageType[] storageTypes = lb.getStorageTypes();
+      for (StorageType storageType : storageTypes) {
+        Assert.assertTrue(StorageType.DISK == storageType);
+      }
+
+      // Adding one SSD based data node to the cluster.
+      StorageType[][] newtypes = new StorageType[][] {{StorageType.SSD}};
+      startAdditionalDNs(conf, 1, newtypes, cluster);
+
+      // Mock FsDatasetSpi#getPinning to show that the block is pinned.
+      for (int i = 0; i < cluster.getDataNodes().size(); i++) {
+        DataNode dn = cluster.getDataNodes().get(i);
+        LOG.info("Simulate block pinning in datanode {}", dn);
+        DataNodeTestUtils.mockDatanodeBlkPinning(dn, true);
+      }
+
+      // move file blocks to ONE_SSD policy
+      dfs.setStoragePolicy(dir, "ONE_SSD");
+      int rc = ToolRunner.run(conf, new Mover.Cli(),
+          new String[] {"-p", dir.toString()});
+
+      int exitcode = ExitStatus.NO_MOVE_BLOCK.getExitCode();
+      Assert.assertEquals("Movement should fail", exitcode, rc);
+
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+  /**
+   * Test to verify that mover should work well with pinned blocks as well as
+   * failed blocks. Mover should continue retrying the failed blocks only.
+   */
+  @Test(timeout = 90000)
+  public void testMoverFailedRetryWithPinnedBlocks() throws Exception {
+    final Configuration conf = new HdfsConfiguration();
+    initConf(conf);
+    conf.set(DFSConfigKeys.DFS_MOVER_RETRY_MAX_ATTEMPTS_KEY, "2");
+    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(2)
+        .storageTypes(
+            new StorageType[][] {{StorageType.DISK, StorageType.ARCHIVE},
+                {StorageType.DISK, StorageType.ARCHIVE}}).build();
+    try {
+      cluster.waitActive();
+      final DistributedFileSystem dfs = cluster.getFileSystem();
+      final String parenDir = "/parent";
+      dfs.mkdirs(new Path(parenDir));
+      final String file1 = "/parent/testMoverFailedRetryWithPinnedBlocks1";
+      // write to DISK
+      final FSDataOutputStream out = dfs.create(new Path(file1), (short) 2);
+      byte[] fileData = StripedFileTestUtil
+          .generateBytes(DEFAULT_BLOCK_SIZE * 2);
+      out.write(fileData);
+      out.close();
+
+      // Adding pinned blocks.
+      createFileWithFavoredDatanodes(conf, cluster, dfs);
+
+      // Delete block file so, block move will fail with FileNotFoundException
+      LocatedBlocks locatedBlocks = dfs.getClient().getLocatedBlocks(file1, 0);
+      Assert.assertEquals("Wrong block count", 2,
+          locatedBlocks.locatedBlockCount());
+      LocatedBlock lb = locatedBlocks.get(0);
+      cluster.corruptBlockOnDataNodesByDeletingBlockFile(lb.getBlock());
+
+      // move to ARCHIVE
+      dfs.setStoragePolicy(new Path(parenDir), "COLD");
+      int rc = ToolRunner.run(conf, new Mover.Cli(),
+          new String[] {"-p", parenDir.toString()});
+      Assert.assertEquals("Movement should fail after some retry",
+          ExitStatus.NO_MOVE_PROGRESS.getExitCode(), rc);
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+  private void createFileWithFavoredDatanodes(final Configuration conf,
+      final MiniDFSCluster cluster, final DistributedFileSystem dfs)
+          throws IOException {
+    // Adding two DISK based data node to the cluster.
+    // Also, ensure that blocks are pinned in these new data nodes.
+    StorageType[][] newtypes =
+        new StorageType[][] {{StorageType.DISK}, {StorageType.DISK}};
+    startAdditionalDNs(conf, 2, newtypes, cluster);
+    ArrayList<DataNode> dataNodes = cluster.getDataNodes();
+    InetSocketAddress[] favoredNodes = new InetSocketAddress[2];
+    int j = 0;
+    for (int i = dataNodes.size() - 1; i >= 2; i--) {
+      favoredNodes[j++] = dataNodes.get(i).getXferAddress();
+    }
+    final String file = "/parent/testMoverFailedRetryWithPinnedBlocks2";
+    final FSDataOutputStream out = dfs.create(new Path(file),
+        FsPermission.getDefault(), true, DEFAULT_BLOCK_SIZE, (short) 2,
+        DEFAULT_BLOCK_SIZE, null, favoredNodes);
+    byte[] fileData = StripedFileTestUtil.generateBytes(DEFAULT_BLOCK_SIZE * 2);
+    out.write(fileData);
+    out.close();
+
+    // Mock FsDatasetSpi#getPinning to show that the block is pinned.
+    LocatedBlocks locatedBlocks = dfs.getClient().getLocatedBlocks(file, 0);
+    Assert.assertEquals("Wrong block count", 2,
+        locatedBlocks.locatedBlockCount());
+    LocatedBlock lb = locatedBlocks.get(0);
+    DatanodeInfo datanodeInfo = lb.getLocations()[0];
+    for (DataNode dn : cluster.getDataNodes()) {
+      if (dn.getDatanodeId().getDatanodeUuid()
+          .equals(datanodeInfo.getDatanodeUuid())) {
+        LOG.info("Simulate block pinning in datanode {}", datanodeInfo);
+        DataNodeTestUtils.mockDatanodeBlkPinning(dn, true);
+        break;
+      }
+    }
+  }
+
+  private void startAdditionalDNs(final Configuration conf,
+      int newNodesRequired, StorageType[][] newTypes,
+      final MiniDFSCluster cluster) throws IOException {
+
+    cluster.startDataNodes(conf, newNodesRequired, newTypes, true, null, null,
+        null, null, null, false, false, false, null);
+    cluster.triggerHeartbeats();
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org