You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by um...@apache.org on 2018/08/12 10:22:12 UTC

[29/50] [abbrv] hadoop git commit: HDFS-13033: [SPS]: Implement a mechanism to do file block movements for external SPS. Contributed by Rakesh R.

HDFS-13033: [SPS]: Implement a mechanism to do file block movements for external SPS. Contributed by Rakesh R.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b0cb8d9b
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b0cb8d9b
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b0cb8d9b

Branch: refs/heads/HDFS-10285
Commit: b0cb8d9bb44c963ae686d2b5c1b70bc76b955e10
Parents: 3159b39
Author: Uma Maheswara Rao G <um...@intel.com>
Authored: Tue Jan 23 16:19:46 2018 -0800
Committer: Uma Maheswara Rao Gangumalla <um...@apache.org>
Committed: Sun Aug 12 03:06:03 2018 -0700

----------------------------------------------------------------------
 .../hdfs/server/balancer/NameNodeConnector.java |   8 +
 .../hdfs/server/common/sps/BlockDispatcher.java | 186 +++++++++++++
 .../sps/BlockMovementAttemptFinished.java       |  80 ++++++
 .../server/common/sps/BlockMovementStatus.java  |  53 ++++
 .../common/sps/BlockStorageMovementTracker.java | 184 +++++++++++++
 .../sps/BlocksMovementsStatusHandler.java       |  95 +++++++
 .../hdfs/server/common/sps/package-info.java    |  27 ++
 .../datanode/BlockStorageMovementTracker.java   | 186 -------------
 .../datanode/StoragePolicySatisfyWorker.java    | 271 ++-----------------
 .../hdfs/server/namenode/FSNamesystem.java      |   4 +-
 .../namenode/sps/BlockMoveTaskHandler.java      |   3 +-
 .../sps/BlockStorageMovementAttemptedItems.java |  12 +-
 .../IntraSPSNameNodeBlockMoveTaskHandler.java   |   3 +-
 .../hdfs/server/namenode/sps/SPSService.java    |  14 +-
 .../namenode/sps/StoragePolicySatisfier.java    |  30 +-
 .../sps/ExternalSPSBlockMoveTaskHandler.java    | 233 ++++++++++++++++
 .../TestBlockStorageMovementAttemptedItems.java |   2 +-
 .../sps/TestStoragePolicySatisfier.java         |   6 +-
 .../sps/TestExternalStoragePolicySatisfier.java |  69 ++++-
 19 files changed, 997 insertions(+), 469 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/b0cb8d9b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
index b0dd779..6bfbbb3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
@@ -269,6 +269,14 @@ public class NameNodeConnector implements Closeable {
     }
   }
 
+  /**
+   * Returns fallbackToSimpleAuth. This will be true or false during calls to
+   * indicate if a secure client falls back to simple auth.
+   */
+  public AtomicBoolean getFallbackToSimpleAuth() {
+    return fallbackToSimpleAuth;
+  }
+
   @Override
   public void close() {
     keyManager.close();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b0cb8d9b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlockDispatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlockDispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlockDispatcher.java
new file mode 100644
index 0000000..f87fcae
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlockDispatcher.java
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.common.sps;
+
+import static org.apache.hadoop.hdfs.protocolPB.PBHelperClient.vintPrefixed;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.datatransfer.BlockPinningException;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.token.Token;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Dispatching block replica moves between datanodes to satisfy the storage
+ * policy.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class BlockDispatcher {
+  private static final Logger LOG = LoggerFactory
+      .getLogger(BlockDispatcher.class);
+
+  private final boolean connectToDnViaHostname;
+  private final int socketTimeout;
+  private final int ioFileBufferSize;
+
+  /**
+   * Construct block dispatcher details.
+   *
+   * @param sockTimeout
+   *          soTimeout
+   * @param ioFileBuffSize
+   *          file io buffer size
+   * @param connectToDatanodeViaHostname
+   *          true represents connect via hostname, false otw
+   */
+  public BlockDispatcher(int sockTimeout, int ioFileBuffSize,
+      boolean connectToDatanodeViaHostname) {
+    this.socketTimeout = sockTimeout;
+    this.ioFileBufferSize = ioFileBuffSize;
+    this.connectToDnViaHostname = connectToDatanodeViaHostname;
+  }
+
+  /**
+   * Moves the given block replica to the given target node and wait for the
+   * response.
+   *
+   * @param blkMovingInfo
+   *          block to storage info
+   * @param saslClient
+   *          SASL for DataTransferProtocol on behalf of a client
+   * @param eb
+   *          extended block info
+   * @param sock
+   *          target node's socket
+   * @param km
+   *          for creation of an encryption key
+   * @param accessToken
+   *          connection block access token
+   * @return status of the block movement
+   */
+  public BlockMovementStatus moveBlock(BlockMovingInfo blkMovingInfo,
+      SaslDataTransferClient saslClient, ExtendedBlock eb, Socket sock,
+      DataEncryptionKeyFactory km, Token<BlockTokenIdentifier> accessToken) {
+    LOG.info("Start moving block:{} from src:{} to destin:{} to satisfy "
+        + "storageType, sourceStoragetype:{} and destinStoragetype:{}",
+        blkMovingInfo.getBlock(), blkMovingInfo.getSource(),
+        blkMovingInfo.getTarget(), blkMovingInfo.getSourceStorageType(),
+        blkMovingInfo.getTargetStorageType());
+    DataOutputStream out = null;
+    DataInputStream in = null;
+    try {
+      NetUtils.connect(sock,
+          NetUtils.createSocketAddr(
+              blkMovingInfo.getTarget().getXferAddr(connectToDnViaHostname)),
+          socketTimeout);
+      // Set read timeout so that it doesn't hang forever against
+      // unresponsive nodes. Datanode normally sends IN_PROGRESS response
+      // twice within the client read timeout period (every 30 seconds by
+      // default). Here, we make it give up after "socketTimeout * 5" period
+      // of no response.
+      sock.setSoTimeout(socketTimeout * 5);
+      sock.setKeepAlive(true);
+      OutputStream unbufOut = sock.getOutputStream();
+      InputStream unbufIn = sock.getInputStream();
+      LOG.debug("Connecting to datanode {}", blkMovingInfo.getTarget());
+
+      IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut,
+          unbufIn, km, accessToken, blkMovingInfo.getTarget());
+      unbufOut = saslStreams.out;
+      unbufIn = saslStreams.in;
+      out = new DataOutputStream(
+          new BufferedOutputStream(unbufOut, ioFileBufferSize));
+      in = new DataInputStream(
+          new BufferedInputStream(unbufIn, ioFileBufferSize));
+      sendRequest(out, eb, accessToken, blkMovingInfo.getSource(),
+          blkMovingInfo.getTargetStorageType());
+      receiveResponse(in);
+
+      LOG.info(
+          "Successfully moved block:{} from src:{} to destin:{} for"
+              + " satisfying storageType:{}",
+          blkMovingInfo.getBlock(), blkMovingInfo.getSource(),
+          blkMovingInfo.getTarget(), blkMovingInfo.getTargetStorageType());
+      return BlockMovementStatus.DN_BLK_STORAGE_MOVEMENT_SUCCESS;
+    } catch (BlockPinningException e) {
+      // Pinned block won't be able to move to a different node. So, its not
+      // required to do retries, just marked as SUCCESS.
+      LOG.debug("Pinned block can't be moved, so skipping block:{}",
+          blkMovingInfo.getBlock(), e);
+      return BlockMovementStatus.DN_BLK_STORAGE_MOVEMENT_SUCCESS;
+    } catch (IOException e) {
+      // TODO: handle failure retries
+      LOG.warn(
+          "Failed to move block:{} from src:{} to destin:{} to satisfy "
+              + "storageType:{}",
+          blkMovingInfo.getBlock(), blkMovingInfo.getSource(),
+          blkMovingInfo.getTarget(), blkMovingInfo.getTargetStorageType(), e);
+      return BlockMovementStatus.DN_BLK_STORAGE_MOVEMENT_FAILURE;
+    } finally {
+      IOUtils.closeStream(out);
+      IOUtils.closeStream(in);
+      IOUtils.closeSocket(sock);
+    }
+  }
+
+  /** Send a reportedBlock replace request to the output stream. */
+  private static void sendRequest(DataOutputStream out, ExtendedBlock eb,
+      Token<BlockTokenIdentifier> accessToken, DatanodeInfo source,
+      StorageType targetStorageType) throws IOException {
+    new Sender(out).replaceBlock(eb, targetStorageType, accessToken,
+        source.getDatanodeUuid(), source, null);
+  }
+
+  /** Receive a reportedBlock copy response from the input stream. */
+  private static void receiveResponse(DataInputStream in) throws IOException {
+    BlockOpResponseProto response = BlockOpResponseProto
+        .parseFrom(vintPrefixed(in));
+    while (response.getStatus() == Status.IN_PROGRESS) {
+      // read intermediate responses
+      response = BlockOpResponseProto.parseFrom(vintPrefixed(in));
+    }
+    String logInfo = "reportedBlock move is failed";
+    DataTransferProtoUtil.checkBlockOpStatus(response, logInfo);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b0cb8d9b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlockMovementAttemptFinished.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlockMovementAttemptFinished.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlockMovementAttemptFinished.java
new file mode 100644
index 0000000..419d806
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlockMovementAttemptFinished.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.common.sps;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+
+/**
+ * This class represents status from a block movement task. This will have the
+ * information of the task which was successful or failed due to errors.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class BlockMovementAttemptFinished {
+  private final Block block;
+  private final DatanodeInfo src;
+  private final DatanodeInfo target;
+  private final BlockMovementStatus status;
+
+  /**
+   * Construct movement attempt finished info.
+   *
+   * @param block
+   *          block
+   * @param src
+   *          src datanode
+   * @param target
+   *          target datanode
+   * @param status
+   *          movement status
+   */
+  public BlockMovementAttemptFinished(Block block, DatanodeInfo src,
+      DatanodeInfo target, BlockMovementStatus status) {
+    this.block = block;
+    this.src = src;
+    this.target = target;
+    this.status = status;
+  }
+
+  /**
+   * @return details of the block, which attempted to move from src to target
+   *         node.
+   */
+  public Block getBlock() {
+    return block;
+  }
+
+  /**
+   * @return block movement status code.
+   */
+  public BlockMovementStatus getStatus() {
+    return status;
+  }
+
+  @Override
+  public String toString() {
+    return new StringBuilder().append("Block movement attempt finished(\n  ")
+        .append(" block : ").append(block).append(" src node: ").append(src)
+        .append(" target node: ").append(target).append(" movement status: ")
+        .append(status).append(")").toString();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b0cb8d9b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlockMovementStatus.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlockMovementStatus.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlockMovementStatus.java
new file mode 100644
index 0000000..f70d84f
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlockMovementStatus.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.common.sps;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Block movement status code.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public enum BlockMovementStatus {
+  /** Success. */
+  DN_BLK_STORAGE_MOVEMENT_SUCCESS(0),
+  /**
+   * Failure due to generation time stamp mismatches or network errors
+   * or no available space.
+   */
+  DN_BLK_STORAGE_MOVEMENT_FAILURE(-1);
+
+  // TODO: need to support different type of failures. Failure due to network
+  // errors, block pinned, no space available etc.
+
+  private final int code;
+
+  BlockMovementStatus(int code) {
+    this.code = code;
+  }
+
+  /**
+   * @return the status code.
+   */
+  int getStatusCode() {
+    return code;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b0cb8d9b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlockStorageMovementTracker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlockStorageMovementTracker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlockStorageMovementTracker.java
new file mode 100644
index 0000000..b20d6cf
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlockStorageMovementTracker.java
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.common.sps;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class is used to track the completion of block movement future tasks.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class BlockStorageMovementTracker implements Runnable {
+  private static final Logger LOG = LoggerFactory
+      .getLogger(BlockStorageMovementTracker.class);
+  private final CompletionService<BlockMovementAttemptFinished> moverCompletionService;
+  private final BlocksMovementsStatusHandler blksMovementsStatusHandler;
+
+  // Keeps the information - block vs its list of future move tasks
+  private final Map<Block, List<Future<BlockMovementAttemptFinished>>> moverTaskFutures;
+  private final Map<Block, List<BlockMovementAttemptFinished>> movementResults;
+
+  private volatile boolean running = true;
+
+  /**
+   * BlockStorageMovementTracker constructor.
+   *
+   * @param moverCompletionService
+   *          completion service.
+   * @param handler
+   *          blocks movements status handler
+   */
+  public BlockStorageMovementTracker(
+      CompletionService<BlockMovementAttemptFinished> moverCompletionService,
+      BlocksMovementsStatusHandler handler) {
+    this.moverCompletionService = moverCompletionService;
+    this.moverTaskFutures = new HashMap<>();
+    this.blksMovementsStatusHandler = handler;
+    this.movementResults = new HashMap<>();
+  }
+
+  @Override
+  public void run() {
+    while (running) {
+      if (moverTaskFutures.size() <= 0) {
+        try {
+          synchronized (moverTaskFutures) {
+            // Waiting for mover tasks.
+            moverTaskFutures.wait(2000);
+          }
+        } catch (InterruptedException ignore) {
+          // Sets interrupt flag of this thread.
+          Thread.currentThread().interrupt();
+        }
+      }
+      try {
+        Future<BlockMovementAttemptFinished> future =
+            moverCompletionService.take();
+        if (future != null) {
+          BlockMovementAttemptFinished result = future.get();
+          LOG.debug("Completed block movement. {}", result);
+          Block block = result.getBlock();
+          List<Future<BlockMovementAttemptFinished>> blocksMoving =
+              moverTaskFutures.get(block);
+          if (blocksMoving == null) {
+            LOG.warn("Future task doesn't exist for block : {} ", block);
+            continue;
+          }
+          blocksMoving.remove(future);
+
+          List<BlockMovementAttemptFinished> resultPerTrackIdList =
+              addMovementResultToBlockIdList(result);
+
+          // Completed all the scheduled blocks movement under this 'trackId'.
+          if (blocksMoving.isEmpty() || moverTaskFutures.get(block) == null) {
+            synchronized (moverTaskFutures) {
+              moverTaskFutures.remove(block);
+            }
+            if (running) {
+              // handle completed or inprogress blocks movements per trackId.
+              blksMovementsStatusHandler.handle(resultPerTrackIdList);
+            }
+            movementResults.remove(block);
+          }
+        }
+      } catch (InterruptedException e) {
+        if (running) {
+          LOG.error("Exception while moving block replica to target storage"
+              + " type", e);
+        }
+      } catch (ExecutionException e) {
+        // TODO: Do we need failure retries and implement the same if required.
+        LOG.error("Exception while moving block replica to target storage type",
+            e);
+      }
+    }
+  }
+
+  private List<BlockMovementAttemptFinished> addMovementResultToBlockIdList(
+      BlockMovementAttemptFinished result) {
+    Block block = result.getBlock();
+    List<BlockMovementAttemptFinished> perBlockIdList;
+    synchronized (movementResults) {
+      perBlockIdList = movementResults.get(block);
+      if (perBlockIdList == null) {
+        perBlockIdList = new ArrayList<>();
+        movementResults.put(block, perBlockIdList);
+      }
+      perBlockIdList.add(result);
+    }
+    return perBlockIdList;
+  }
+
+  /**
+   * Add future task to the tracking list to check the completion status of the
+   * block movement.
+   *
+   * @param blockID
+   *          block identifier
+   * @param futureTask
+   *          future task used for moving the respective block
+   */
+  public void addBlock(Block block,
+      Future<BlockMovementAttemptFinished> futureTask) {
+    synchronized (moverTaskFutures) {
+      List<Future<BlockMovementAttemptFinished>> futures =
+          moverTaskFutures.get(block);
+      // null for the first task
+      if (futures == null) {
+        futures = new ArrayList<>();
+        moverTaskFutures.put(block, futures);
+      }
+      futures.add(futureTask);
+      // Notify waiting tracker thread about the newly added tasks.
+      moverTaskFutures.notify();
+    }
+  }
+
+  /**
+   * Clear the pending movement and movement result queues.
+   */
+  public void removeAll() {
+    synchronized (moverTaskFutures) {
+      moverTaskFutures.clear();
+    }
+    synchronized (movementResults) {
+      movementResults.clear();
+    }
+  }
+
+  /**
+   * Sets running flag to false and clear the pending movement result queues.
+   */
+  public void stopTracking() {
+    running = false;
+    removeAll();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b0cb8d9b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlocksMovementsStatusHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlocksMovementsStatusHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlocksMovementsStatusHandler.java
new file mode 100644
index 0000000..f9f3954
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlocksMovementsStatusHandler.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.common.sps;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.protocol.Block;
+
+/**
+ * Blocks movements status handler, which is used to collect details of the
+ * completed block movements and later these attempted finished(with success or
+ * failure) blocks can be accessed to notify respective listeners, if any.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class BlocksMovementsStatusHandler {
+  private final List<Block> blockIdVsMovementStatus = new ArrayList<>();
+
+  /**
+   * Collect all the storage movement attempt finished blocks. Later this will
+   * be send to namenode via heart beat.
+   *
+   * @param moveAttemptFinishedBlks
+   *          set of storage movement attempt finished blocks
+   */
+  public void handle(
+      List<BlockMovementAttemptFinished> moveAttemptFinishedBlks) {
+    List<Block> blocks = new ArrayList<>();
+
+    for (BlockMovementAttemptFinished item : moveAttemptFinishedBlks) {
+      blocks.add(item.getBlock());
+    }
+    // Adding to the tracking report list. Later this can be accessed to know
+    // the attempted block movements.
+    synchronized (blockIdVsMovementStatus) {
+      blockIdVsMovementStatus.addAll(blocks);
+    }
+  }
+
+  /**
+   * @return unmodifiable list of storage movement attempt finished blocks.
+   */
+  public List<Block> getMoveAttemptFinishedBlocks() {
+    List<Block> moveAttemptFinishedBlks = new ArrayList<>();
+    // 1. Adding all the completed block ids.
+    synchronized (blockIdVsMovementStatus) {
+      if (blockIdVsMovementStatus.size() > 0) {
+        moveAttemptFinishedBlks = Collections
+            .unmodifiableList(blockIdVsMovementStatus);
+      }
+    }
+    return moveAttemptFinishedBlks;
+  }
+
+  /**
+   * Remove the storage movement attempt finished blocks from the tracking list.
+   *
+   * @param moveAttemptFinishedBlks
+   *          set of storage movement attempt finished blocks
+   */
+  public void remove(List<Block> moveAttemptFinishedBlks) {
+    if (moveAttemptFinishedBlks != null) {
+      blockIdVsMovementStatus.removeAll(moveAttemptFinishedBlks);
+    }
+  }
+
+  /**
+   * Clear the blockID vs movement status tracking map.
+   */
+  public void removeAll() {
+    synchronized (blockIdVsMovementStatus) {
+      blockIdVsMovementStatus.clear();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b0cb8d9b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/package-info.java
new file mode 100644
index 0000000..fcffbe9
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/package-info.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This package provides commonly used classes for the block movement.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.hdfs.server.common.sps;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b0cb8d9b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockStorageMovementTracker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockStorageMovementTracker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockStorageMovementTracker.java
deleted file mode 100644
index b3b9fd9..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockStorageMovementTracker.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.datanode;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CompletionService;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.server.datanode.StoragePolicySatisfyWorker.BlockMovementAttemptFinished;
-import org.apache.hadoop.hdfs.server.datanode.StoragePolicySatisfyWorker.BlocksMovementsStatusHandler;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This class is used to track the completion of block movement future tasks.
- */
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-public class BlockStorageMovementTracker implements Runnable {
-  private static final Logger LOG = LoggerFactory
-      .getLogger(BlockStorageMovementTracker.class);
-  private final CompletionService<BlockMovementAttemptFinished> moverCompletionService;
-  private final BlocksMovementsStatusHandler blksMovementsStatusHandler;
-
-  // Keeps the information - block vs its list of future move tasks
-  private final Map<Block, List<Future<BlockMovementAttemptFinished>>> moverTaskFutures;
-  private final Map<Block, List<BlockMovementAttemptFinished>> movementResults;
-
-  private volatile boolean running = true;
-
-  /**
-   * BlockStorageMovementTracker constructor.
-   *
-   * @param moverCompletionService
-   *          completion service.
-   * @param handler
-   *          blocks movements status handler
-   */
-  public BlockStorageMovementTracker(
-      CompletionService<BlockMovementAttemptFinished> moverCompletionService,
-      BlocksMovementsStatusHandler handler) {
-    this.moverCompletionService = moverCompletionService;
-    this.moverTaskFutures = new HashMap<>();
-    this.blksMovementsStatusHandler = handler;
-    this.movementResults = new HashMap<>();
-  }
-
-  @Override
-  public void run() {
-    while (running) {
-      if (moverTaskFutures.size() <= 0) {
-        try {
-          synchronized (moverTaskFutures) {
-            // Waiting for mover tasks.
-            moverTaskFutures.wait(2000);
-          }
-        } catch (InterruptedException ignore) {
-          // Sets interrupt flag of this thread.
-          Thread.currentThread().interrupt();
-        }
-      }
-      try {
-        Future<BlockMovementAttemptFinished> future =
-            moverCompletionService.take();
-        if (future != null) {
-          BlockMovementAttemptFinished result = future.get();
-          LOG.debug("Completed block movement. {}", result);
-          Block block = result.getBlock();
-          List<Future<BlockMovementAttemptFinished>> blocksMoving =
-              moverTaskFutures.get(block);
-          if (blocksMoving == null) {
-            LOG.warn("Future task doesn't exist for block : {} ", block);
-            continue;
-          }
-          blocksMoving.remove(future);
-
-          List<BlockMovementAttemptFinished> resultPerTrackIdList =
-              addMovementResultToBlockIdList(result);
-
-          // Completed all the scheduled blocks movement under this 'trackId'.
-          if (blocksMoving.isEmpty() || moverTaskFutures.get(block) == null) {
-            synchronized (moverTaskFutures) {
-              moverTaskFutures.remove(block);
-            }
-            if (running) {
-              // handle completed or inprogress blocks movements per trackId.
-              blksMovementsStatusHandler.handle(resultPerTrackIdList);
-            }
-            movementResults.remove(block);
-          }
-        }
-      } catch (InterruptedException e) {
-        if (running) {
-          LOG.error("Exception while moving block replica to target storage"
-              + " type", e);
-        }
-      } catch (ExecutionException e) {
-        // TODO: Do we need failure retries and implement the same if required.
-        LOG.error("Exception while moving block replica to target storage type",
-            e);
-      }
-    }
-  }
-
-  private List<BlockMovementAttemptFinished> addMovementResultToBlockIdList(
-      BlockMovementAttemptFinished result) {
-    Block block = result.getBlock();
-    List<BlockMovementAttemptFinished> perBlockIdList;
-    synchronized (movementResults) {
-      perBlockIdList = movementResults.get(block);
-      if (perBlockIdList == null) {
-        perBlockIdList = new ArrayList<>();
-        movementResults.put(block, perBlockIdList);
-      }
-      perBlockIdList.add(result);
-    }
-    return perBlockIdList;
-  }
-
-  /**
-   * Add future task to the tracking list to check the completion status of the
-   * block movement.
-   *
-   * @param blockID
-   *          block identifier
-   * @param futureTask
-   *          future task used for moving the respective block
-   */
-  void addBlock(Block block,
-      Future<BlockMovementAttemptFinished> futureTask) {
-    synchronized (moverTaskFutures) {
-      List<Future<BlockMovementAttemptFinished>> futures =
-          moverTaskFutures.get(block);
-      // null for the first task
-      if (futures == null) {
-        futures = new ArrayList<>();
-        moverTaskFutures.put(block, futures);
-      }
-      futures.add(futureTask);
-      // Notify waiting tracker thread about the newly added tasks.
-      moverTaskFutures.notify();
-    }
-  }
-
-  /**
-   * Clear the pending movement and movement result queues.
-   */
-  void removeAll() {
-    synchronized (moverTaskFutures) {
-      moverTaskFutures.clear();
-    }
-    synchronized (movementResults) {
-      movementResults.clear();
-    }
-  }
-
-  /**
-   * Sets running flag to false and clear the pending movement result queues.
-   */
-  public void stopTracking() {
-    running = false;
-    removeAll();
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b0cb8d9b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
index 9a9c7e0..42f2e93 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
@@ -17,21 +17,9 @@
  */
 package org.apache.hadoop.hdfs.server.datanode;
 
-import static org.apache.hadoop.hdfs.protocolPB.PBHelperClient.vintPrefixed;
-
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.Socket;
-import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.EnumSet;
-import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletionService;
 import java.util.concurrent.ExecutorCompletionService;
@@ -47,20 +35,15 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtilClient;
-import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.datatransfer.BlockPinningException;
-import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
-import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
-import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
 import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.common.sps.BlockMovementAttemptFinished;
+import org.apache.hadoop.hdfs.server.common.sps.BlockMovementStatus;
+import org.apache.hadoop.hdfs.server.common.sps.BlockStorageMovementTracker;
+import org.apache.hadoop.hdfs.server.common.sps.BlocksMovementsStatusHandler;
+import org.apache.hadoop.hdfs.server.common.sps.BlockDispatcher;
 import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.Daemon;
 import org.slf4j.Logger;
@@ -81,7 +64,6 @@ public class StoragePolicySatisfyWorker {
       .getLogger(StoragePolicySatisfyWorker.class);
 
   private final DataNode datanode;
-  private final int ioFileBufferSize;
 
   private final int moverThreads;
   private final ExecutorService moveExecutor;
@@ -89,10 +71,10 @@ public class StoragePolicySatisfyWorker {
   private final BlocksMovementsStatusHandler handler;
   private final BlockStorageMovementTracker movementTracker;
   private Daemon movementTrackerThread;
+  private final BlockDispatcher blkDispatcher;
 
   public StoragePolicySatisfyWorker(Configuration conf, DataNode datanode) {
     this.datanode = datanode;
-    this.ioFileBufferSize = DFSUtilClient.getIoFileBufferSize(conf);
 
     moverThreads = conf.getInt(DFSConfigKeys.DFS_MOVER_MOVERTHREADS_KEY,
         DFSConfigKeys.DFS_MOVER_MOVERTHREADS_DEFAULT);
@@ -103,7 +85,10 @@ public class StoragePolicySatisfyWorker {
         handler);
     movementTrackerThread = new Daemon(movementTracker);
     movementTrackerThread.setName("BlockStorageMovementTracker");
-
+    DNConf dnConf = datanode.getDnConf();
+    int ioFileBufferSize = DFSUtilClient.getIoFileBufferSize(conf);
+    blkDispatcher = new BlockDispatcher(dnConf.getSocketTimeout(),
+        ioFileBufferSize, dnConf.getConnectToDnViaHostname());
     // TODO: Needs to manage the number of concurrent moves per DataNode.
   }
 
@@ -183,8 +168,7 @@ public class StoragePolicySatisfyWorker {
       assert sourceStorageType != targetStorageType
           : "Source and Target storage type shouldn't be same!";
       BlockMovingTask blockMovingTask = new BlockMovingTask(blockPoolID,
-          blkMovingInfo.getBlock(), blkMovingInfo.getSource(),
-          blkMovingInfo.getTarget(), sourceStorageType, targetStorageType);
+          blkMovingInfo);
       Future<BlockMovementAttemptFinished> moveCallable = moverCompletionService
           .submit(blockMovingTask);
       movementTracker.addBlock(blkMovingInfo.getBlock(),
@@ -199,244 +183,45 @@ public class StoragePolicySatisfyWorker {
   private class BlockMovingTask implements
       Callable<BlockMovementAttemptFinished> {
     private final String blockPoolID;
-    private final Block block;
-    private final DatanodeInfo source;
-    private final DatanodeInfo target;
-    private final StorageType srcStorageType;
-    private final StorageType targetStorageType;
+    private final BlockMovingInfo blkMovingInfo;
 
-    BlockMovingTask(String blockPoolID, Block block,
-        DatanodeInfo source, DatanodeInfo target,
-        StorageType srcStorageType, StorageType targetStorageType) {
+    BlockMovingTask(String blockPoolID, BlockMovingInfo blkMovInfo) {
       this.blockPoolID = blockPoolID;
-      this.block = block;
-      this.source = source;
-      this.target = target;
-      this.srcStorageType = srcStorageType;
-      this.targetStorageType = targetStorageType;
+      this.blkMovingInfo = blkMovInfo;
     }
 
     @Override
     public BlockMovementAttemptFinished call() {
       BlockMovementStatus status = moveBlock();
-      return new BlockMovementAttemptFinished(block, source, target, status);
+      return new BlockMovementAttemptFinished(blkMovingInfo.getBlock(),
+          blkMovingInfo.getSource(), blkMovingInfo.getTarget(), status);
     }
 
     private BlockMovementStatus moveBlock() {
-      LOG.info("Start moving block:{} from src:{} to destin:{} to satisfy "
-          + "storageType, sourceStoragetype:{} and destinStoragetype:{}",
-          block, source, target, srcStorageType, targetStorageType);
-      Socket sock = null;
-      DataOutputStream out = null;
-      DataInputStream in = null;
+      datanode.incrementXmitsInProgress();
+      ExtendedBlock eb = new ExtendedBlock(blockPoolID,
+          blkMovingInfo.getBlock());
       try {
-        datanode.incrementXmitsInProgress();
-
-        ExtendedBlock extendedBlock = new ExtendedBlock(blockPoolID, block);
-        DNConf dnConf = datanode.getDnConf();
-
-        String dnAddr = datanode.getDatanodeId()
-            .getXferAddr(dnConf.getConnectToDnViaHostname());
-        sock = datanode.newSocket();
-        NetUtils.connect(sock, NetUtils.createSocketAddr(dnAddr),
-            dnConf.getSocketTimeout());
-        sock.setSoTimeout(2 * dnConf.getSocketTimeout());
-        LOG.debug("Connecting to datanode {}", dnAddr);
-
-        OutputStream unbufOut = sock.getOutputStream();
-        InputStream unbufIn = sock.getInputStream();
         Token<BlockTokenIdentifier> accessToken = datanode.getBlockAccessToken(
-            extendedBlock, EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE),
-            new StorageType[]{targetStorageType}, new String[0]);
-
+            eb, EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE),
+            new StorageType[]{blkMovingInfo.getTargetStorageType()},
+            new String[0]);
         DataEncryptionKeyFactory keyFactory = datanode
-            .getDataEncryptionKeyFactoryForBlock(extendedBlock);
-        IOStreamPair saslStreams = datanode.getSaslClient().socketSend(sock,
-            unbufOut, unbufIn, keyFactory, accessToken, target);
-        unbufOut = saslStreams.out;
-        unbufIn = saslStreams.in;
-        out = new DataOutputStream(
-            new BufferedOutputStream(unbufOut, ioFileBufferSize));
-        in = new DataInputStream(
-            new BufferedInputStream(unbufIn, ioFileBufferSize));
-        sendRequest(out, extendedBlock, accessToken, source, targetStorageType);
-        receiveResponse(in);
+            .getDataEncryptionKeyFactoryForBlock(eb);
 
-        LOG.info(
-            "Successfully moved block:{} from src:{} to destin:{} for"
-                + " satisfying storageType:{}",
-            block, source, target, targetStorageType);
-        return BlockMovementStatus.DN_BLK_STORAGE_MOVEMENT_SUCCESS;
-      } catch (BlockPinningException e) {
-        // Pinned block won't be able to move to a different node. So, its not
-        // required to do retries, just marked as SUCCESS.
-        LOG.debug("Pinned block can't be moved, so skipping block:{}", block,
-            e);
-        return BlockMovementStatus.DN_BLK_STORAGE_MOVEMENT_SUCCESS;
+        return blkDispatcher.moveBlock(blkMovingInfo,
+            datanode.getSaslClient(), eb, datanode.newSocket(),
+            keyFactory, accessToken);
       } catch (IOException e) {
         // TODO: handle failure retries
         LOG.warn(
             "Failed to move block:{} from src:{} to destin:{} to satisfy "
                 + "storageType:{}",
-                block, source, target, targetStorageType, e);
+            blkMovingInfo.getBlock(), blkMovingInfo.getSource(),
+            blkMovingInfo.getTarget(), blkMovingInfo.getTargetStorageType(), e);
         return BlockMovementStatus.DN_BLK_STORAGE_MOVEMENT_FAILURE;
       } finally {
         datanode.decrementXmitsInProgress();
-        IOUtils.closeStream(out);
-        IOUtils.closeStream(in);
-        IOUtils.closeSocket(sock);
-      }
-    }
-
-    /** Send a reportedBlock replace request to the output stream. */
-    private void sendRequest(DataOutputStream out, ExtendedBlock eb,
-        Token<BlockTokenIdentifier> accessToken, DatanodeInfo srcDn,
-        StorageType destinStorageType) throws IOException {
-      new Sender(out).replaceBlock(eb, destinStorageType, accessToken,
-          srcDn.getDatanodeUuid(), srcDn, null);
-    }
-
-    /** Receive a reportedBlock copy response from the input stream. */
-    private void receiveResponse(DataInputStream in) throws IOException {
-      BlockOpResponseProto response = BlockOpResponseProto
-          .parseFrom(vintPrefixed(in));
-      while (response.getStatus() == Status.IN_PROGRESS) {
-        // read intermediate responses
-        response = BlockOpResponseProto.parseFrom(vintPrefixed(in));
-      }
-      String logInfo = "reportedBlock move is failed";
-      DataTransferProtoUtil.checkBlockOpStatus(response, logInfo, true);
-    }
-  }
-
-  /**
-   * Block movement status code.
-   */
-  public enum BlockMovementStatus {
-    /** Success. */
-    DN_BLK_STORAGE_MOVEMENT_SUCCESS(0),
-    /**
-     * Failure due to generation time stamp mismatches or network errors
-     * or no available space.
-     */
-    DN_BLK_STORAGE_MOVEMENT_FAILURE(-1);
-
-    // TODO: need to support different type of failures. Failure due to network
-    // errors, block pinned, no space available etc.
-
-    private final int code;
-
-    BlockMovementStatus(int code) {
-      this.code = code;
-    }
-
-    /**
-     * @return the status code.
-     */
-    int getStatusCode() {
-      return code;
-    }
-  }
-
-  /**
-   * This class represents status from a block movement task. This will have the
-   * information of the task which was successful or failed due to errors.
-   */
-  static class BlockMovementAttemptFinished {
-    private final Block block;
-    private final DatanodeInfo src;
-    private final DatanodeInfo target;
-    private final BlockMovementStatus status;
-
-    BlockMovementAttemptFinished(Block block, DatanodeInfo src,
-        DatanodeInfo target, BlockMovementStatus status) {
-      this.block = block;
-      this.src = src;
-      this.target = target;
-      this.status = status;
-    }
-
-    Block getBlock() {
-      return block;
-    }
-
-    BlockMovementStatus getStatus() {
-      return status;
-    }
-
-    @Override
-    public String toString() {
-      return new StringBuilder().append("Block movement attempt finished(\n  ")
-          .append(" block : ")
-          .append(block).append(" src node: ").append(src)
-          .append(" target node: ").append(target)
-          .append(" movement status: ").append(status).append(")").toString();
-    }
-  }
-
-  /**
-   * Blocks movements status handler, which is used to collect details of the
-   * completed block movements and it will send these attempted finished(with
-   * success or failure) blocks to the namenode via heartbeat.
-   */
-  public static class BlocksMovementsStatusHandler {
-    private final List<Block> blockIdVsMovementStatus =
-        new ArrayList<>();
-
-    /**
-     * Collect all the storage movement attempt finished blocks. Later this will
-     * be send to namenode via heart beat.
-     *
-     * @param moveAttemptFinishedBlks
-     *          set of storage movement attempt finished blocks
-     */
-    void handle(List<BlockMovementAttemptFinished> moveAttemptFinishedBlks) {
-      List<Block> blocks = new ArrayList<>();
-
-      for (BlockMovementAttemptFinished item : moveAttemptFinishedBlks) {
-        blocks.add(item.getBlock());
-      }
-      // Adding to the tracking report list. Later this will be send to
-      // namenode via datanode heartbeat.
-      synchronized (blockIdVsMovementStatus) {
-        blockIdVsMovementStatus.addAll(blocks);
-      }
-    }
-
-    /**
-     * @return unmodifiable list of storage movement attempt finished blocks.
-     */
-    List<Block> getMoveAttemptFinishedBlocks() {
-      List<Block> moveAttemptFinishedBlks = new ArrayList<>();
-      // 1. Adding all the completed block ids.
-      synchronized (blockIdVsMovementStatus) {
-        if (blockIdVsMovementStatus.size() > 0) {
-          moveAttemptFinishedBlks = Collections
-              .unmodifiableList(blockIdVsMovementStatus);
-        }
-      }
-      return moveAttemptFinishedBlks;
-    }
-
-    /**
-     * Remove the storage movement attempt finished blocks from the tracking
-     * list.
-     *
-     * @param moveAttemptFinishedBlks
-     *          set of storage movement attempt finished blocks
-     */
-    void remove(List<Block> moveAttemptFinishedBlks) {
-      if (moveAttemptFinishedBlks != null) {
-        blockIdVsMovementStatus.removeAll(moveAttemptFinishedBlks);
-      }
-    }
-
-    /**
-     * Clear the blockID vs movement status tracking map.
-     */
-    void removeAll() {
-      synchronized (blockIdVsMovementStatus) {
-        blockIdVsMovementStatus.clear();
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b0cb8d9b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index ed1c823..37322e7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -1299,7 +1299,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
               blockManager.getSPSService()),
           new IntraSPSNameNodeFileIdCollector(getFSDirectory(),
               blockManager.getSPSService()),
-          new IntraSPSNameNodeBlockMoveTaskHandler(blockManager, this));
+          new IntraSPSNameNodeBlockMoveTaskHandler(blockManager, this), null);
       blockManager.startSPS();
     } finally {
       startingActiveService = false;
@@ -3996,7 +3996,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
                     + "  movement attempt finished block info sent by DN");
           }
         } else {
-          sps.handleStorageMovementAttemptFinishedBlks(blksMovementsFinished);
+          sps.notifyStorageMovementAttemptFinishedBlks(blksMovementsFinished);
         }
       }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b0cb8d9b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockMoveTaskHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockMoveTaskHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockMoveTaskHandler.java
index e6f78e1..1b11d01 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockMoveTaskHandler.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockMoveTaskHandler.java
@@ -38,7 +38,6 @@ public interface BlockMoveTaskHandler {
    * contain the required info to move the block, that source location,
    * destination location and storage types.
    */
-  void submitMoveTask(BlockMovingInfo blkMovingInfo,
-      BlockMovementListener blockMoveCompletionListener) throws IOException;
+  void submitMoveTask(BlockMovingInfo blkMovingInfo) throws IOException;
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b0cb8d9b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementAttemptedItems.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementAttemptedItems.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementAttemptedItems.java
index 3f0155d..ea7a093 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementAttemptedItems.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementAttemptedItems.java
@@ -46,8 +46,7 @@ import com.google.common.annotations.VisibleForTesting;
  * finished for a longer time period, then such items will retries automatically
  * after timeout. The default timeout would be 5 minutes.
  */
-public class BlockStorageMovementAttemptedItems
-    implements BlockMovementListener {
+public class BlockStorageMovementAttemptedItems{
   private static final Logger LOG =
       LoggerFactory.getLogger(BlockStorageMovementAttemptedItems.class);
 
@@ -59,6 +58,7 @@ public class BlockStorageMovementAttemptedItems
   private final List<Block> movementFinishedBlocks;
   private volatile boolean monitorRunning = true;
   private Daemon timerThread = null;
+  private final BlockMovementListener blkMovementListener;
   //
   // It might take anywhere between 5 to 10 minutes before
   // a request is timed out.
@@ -74,7 +74,8 @@ public class BlockStorageMovementAttemptedItems
   private final SPSService service;
 
   public BlockStorageMovementAttemptedItems(SPSService service,
-      BlockStorageMovementNeeded unsatisfiedStorageMovementFiles) {
+      BlockStorageMovementNeeded unsatisfiedStorageMovementFiles,
+      BlockMovementListener blockMovementListener) {
     this.service = service;
     long recheckTimeout = this.service.getConf().getLong(
         DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY,
@@ -89,6 +90,7 @@ public class BlockStorageMovementAttemptedItems
     this.blockStorageMovementNeeded = unsatisfiedStorageMovementFiles;
     storageMovementAttemptedItems = new ArrayList<>();
     movementFinishedBlocks = new ArrayList<>();
+    this.blkMovementListener = blockMovementListener;
   }
 
   /**
@@ -118,6 +120,10 @@ public class BlockStorageMovementAttemptedItems
     synchronized (movementFinishedBlocks) {
       movementFinishedBlocks.addAll(Arrays.asList(moveAttemptFinishedBlks));
     }
+    // External listener if it is plugged-in
+    if (blkMovementListener != null) {
+      blkMovementListener.notifyMovementTriedBlocks(moveAttemptFinishedBlks);
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b0cb8d9b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeBlockMoveTaskHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeBlockMoveTaskHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeBlockMoveTaskHandler.java
index b27e8c9..d6e92d2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeBlockMoveTaskHandler.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeBlockMoveTaskHandler.java
@@ -44,8 +44,7 @@ public class IntraSPSNameNodeBlockMoveTaskHandler
   }
 
   @Override
-  public void submitMoveTask(BlockMovingInfo blkMovingInfo,
-      BlockMovementListener blockMoveCompletionListener) throws IOException {
+  public void submitMoveTask(BlockMovingInfo blkMovingInfo) throws IOException {
     namesystem.readLock();
     try {
       DatanodeDescriptor dn = blockManager.getDatanodeManager()

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b0cb8d9b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java
index d74e391..ecc6ceb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java
@@ -22,6 +22,7 @@ import java.util.List;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
 
 /**
  * An interface for SPSService, which exposes life cycle and processing APIs.
@@ -41,9 +42,11 @@ public interface SPSService {
    *          id
    * @param handler
    *          - a helper service for moving the blocks
+   * @param blkMovementListener
+   *          - listener to know about block movement attempt completion
    */
   void init(Context ctxt, FileIdCollector fileIDCollector,
-      BlockMoveTaskHandler handler);
+      BlockMoveTaskHandler handler, BlockMovementListener blkMovementListener);
 
   /**
    * Starts the SPS service. Make sure to initialize the helper services before
@@ -112,4 +115,13 @@ public interface SPSService {
    *          - directory inode id.
    */
   void markScanCompletedForPath(Long inodeId);
+
+  /**
+   * Notify the details of storage movement attempt finished blocks.
+   *
+   * @param moveAttemptFinishedBlks
+   *          - array contains all the blocks that are attempted to move
+   */
+  void notifyStorageMovementAttemptFinishedBlks(
+      BlocksStorageMoveAttemptFinished moveAttemptFinishedBlks);
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b0cb8d9b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java
index aafdc65..9ba8af7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java
@@ -132,13 +132,14 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
   }
 
   public void init(final Context context, final FileIdCollector fileIDCollector,
-      final BlockMoveTaskHandler blockMovementTaskHandler) {
+      final BlockMoveTaskHandler blockMovementTaskHandler,
+      final BlockMovementListener blockMovementListener) {
     this.ctxt = context;
     this.storageMovementNeeded =
         new BlockStorageMovementNeeded(context, fileIDCollector);
     this.storageMovementsMonitor =
         new BlockStorageMovementAttemptedItems(this,
-        storageMovementNeeded);
+        storageMovementNeeded, blockMovementListener);
     this.blockMoveTaskHandler = blockMovementTaskHandler;
     this.spsWorkMultiplier = DFSUtil.getSPSWorkMultiplier(getConf());
     this.blockMovementMaxRetry = getConf().getInt(
@@ -291,6 +292,7 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
                       + " back to retry queue as some of the blocks"
                       + " are low redundant.");
                 }
+                itemInfo.increRetryCount();
                 this.storageMovementNeeded.add(itemInfo);
                 break;
               case BLOCKS_FAILED_TO_MOVE:
@@ -410,15 +412,18 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
             liveDns, ecPolicy);
         if (blocksPaired) {
           status = BlocksMovingAnalysis.Status.BLOCKS_TARGETS_PAIRED;
-        } else {
-          // none of the blocks found its eligible targets for satisfying the
-          // storage policy.
+        } else
+          if (status != BlocksMovingAnalysis.Status.BLOCKS_TARGETS_PAIRED) {
+          // Check if the previous block was successfully paired. Here the
+          // status will set to NO_BLOCKS_TARGETS_PAIRED only when none of the
+          // blocks of a file found its eligible targets to satisfy the storage
+          // policy.
           status = BlocksMovingAnalysis.Status.NO_BLOCKS_TARGETS_PAIRED;
         }
-      } else {
-        if (hasLowRedundancyBlocks) {
-          status = BlocksMovingAnalysis.Status.FEW_LOW_REDUNDANCY_BLOCKS;
-        }
+      } else if (hasLowRedundancyBlocks
+          && status != BlocksMovingAnalysis.Status.BLOCKS_TARGETS_PAIRED) {
+        // Check if the previous block was successfully paired.
+        status = BlocksMovingAnalysis.Status.FEW_LOW_REDUNDANCY_BLOCKS;
       }
     }
 
@@ -426,8 +431,7 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
     for (BlockMovingInfo blkMovingInfo : blockMovingInfos) {
       // Check for at least one block storage movement has been chosen
       try {
-        blockMoveTaskHandler.submitMoveTask(blkMovingInfo,
-            storageMovementsMonitor);
+        blockMoveTaskHandler.submitMoveTask(blkMovingInfo);
         LOG.debug("BlockMovingInfo: {}", blkMovingInfo);
         assignedBlockIds.add(blkMovingInfo.getBlock());
         blockCount++;
@@ -823,7 +827,7 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
    * @param moveAttemptFinishedBlks
    *          set of storage movement attempt finished blocks.
    */
-  public void handleStorageMovementAttemptFinishedBlks(
+  public void notifyStorageMovementAttemptFinishedBlks(
       BlocksStorageMoveAttemptFinished moveAttemptFinishedBlks) {
     if (moveAttemptFinishedBlks.getBlocks().length <= 0) {
       return;
@@ -833,7 +837,7 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
   }
 
   @VisibleForTesting
-  BlockMovementListener getAttemptedItemsMonitor() {
+  BlockStorageMovementAttemptedItems getAttemptedItemsMonitor() {
     return storageMovementsMonitor;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b0cb8d9b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java
new file mode 100644
index 0000000..a1c8eec
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java
@@ -0,0 +1,233 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.sps;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSUtilClient;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.balancer.KeyManager;
+import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector;
+import org.apache.hadoop.hdfs.server.common.sps.BlockMovementAttemptFinished;
+import org.apache.hadoop.hdfs.server.common.sps.BlockMovementStatus;
+import org.apache.hadoop.hdfs.server.common.sps.BlockStorageMovementTracker;
+import org.apache.hadoop.hdfs.server.common.sps.BlocksMovementsStatusHandler;
+import org.apache.hadoop.hdfs.server.common.sps.BlockDispatcher;
+import org.apache.hadoop.hdfs.server.namenode.sps.BlockMoveTaskHandler;
+import org.apache.hadoop.hdfs.server.namenode.sps.SPSService;
+import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
+import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.Daemon;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class handles the external SPS block movements. This will move the
+ * given block to a target datanode by directly establishing socket connection
+ * to it and invokes function
+ * {@link Sender#replaceBlock(ExtendedBlock, StorageType, Token, String,
+ * DatanodeInfo, String)}.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class ExternalSPSBlockMoveTaskHandler implements BlockMoveTaskHandler {
+  private static final Logger LOG = LoggerFactory
+      .getLogger(ExternalSPSBlockMoveTaskHandler.class);
+
+  private final ExecutorService moveExecutor;
+  private final CompletionService<BlockMovementAttemptFinished> mCompletionServ;
+  private final NameNodeConnector nnc;
+  private final SaslDataTransferClient saslClient;
+  private final BlockStorageMovementTracker blkMovementTracker;
+  private Daemon movementTrackerThread;
+  private final SPSService service;
+  private final BlockDispatcher blkDispatcher;
+
+  public ExternalSPSBlockMoveTaskHandler(Configuration conf,
+      NameNodeConnector nnc, SPSService spsService) {
+    int moverThreads = conf.getInt(DFSConfigKeys.DFS_MOVER_MOVERTHREADS_KEY,
+        DFSConfigKeys.DFS_MOVER_MOVERTHREADS_DEFAULT);
+    moveExecutor = initializeBlockMoverThreadPool(moverThreads);
+    mCompletionServ = new ExecutorCompletionService<>(moveExecutor);
+    this.nnc = nnc;
+    this.saslClient = new SaslDataTransferClient(conf,
+        DataTransferSaslUtil.getSaslPropertiesResolver(conf),
+        TrustedChannelResolver.getInstance(conf),
+        nnc.getFallbackToSimpleAuth());
+    this.blkMovementTracker = new BlockStorageMovementTracker(
+        mCompletionServ, new ExternalBlocksMovementsStatusHandler());
+    this.service = spsService;
+
+    boolean connectToDnViaHostname = conf.getBoolean(
+        HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME,
+        HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT);
+    int ioFileBufferSize = DFSUtilClient.getIoFileBufferSize(conf);
+    blkDispatcher = new BlockDispatcher(HdfsConstants.READ_TIMEOUT,
+        ioFileBufferSize, connectToDnViaHostname);
+  }
+
+  /**
+   * Initializes block movement tracker daemon and starts the thread.
+   */
+  void init() {
+    movementTrackerThread = new Daemon(this.blkMovementTracker);
+    movementTrackerThread.setName("BlockStorageMovementTracker");
+    movementTrackerThread.start();
+  }
+
+  private ThreadPoolExecutor initializeBlockMoverThreadPool(int num) {
+    LOG.debug("Block mover to satisfy storage policy; pool threads={}", num);
+
+    ThreadPoolExecutor moverThreadPool = new ThreadPoolExecutor(1, num, 60,
+        TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
+        new Daemon.DaemonFactory() {
+          private final AtomicInteger threadIndex = new AtomicInteger(0);
+
+          @Override
+          public Thread newThread(Runnable r) {
+            Thread t = super.newThread(r);
+            t.setName("BlockMoverTask-" + threadIndex.getAndIncrement());
+            return t;
+          }
+        }, new ThreadPoolExecutor.CallerRunsPolicy() {
+          @Override
+          public void rejectedExecution(Runnable runnable,
+              ThreadPoolExecutor e) {
+            LOG.info("Execution for block movement to satisfy storage policy"
+                + " got rejected, Executing in current thread");
+            // will run in the current thread.
+            super.rejectedExecution(runnable, e);
+          }
+        });
+
+    moverThreadPool.allowCoreThreadTimeOut(true);
+    return moverThreadPool;
+  }
+
+  @Override
+  public void submitMoveTask(BlockMovingInfo blkMovingInfo) throws IOException {
+    // TODO: Need to increment scheduled block size on the target node. This
+    // count will be used to calculate the remaining space of target datanode
+    // during block movement assignment logic. In the internal movement,
+    // remaining space is bookkeeping at the DatanodeDescriptor, please refer
+    // IntraSPSNameNodeBlockMoveTaskHandler#submitMoveTask implementation and
+    // updating via the funcation call -
+    // dn.incrementBlocksScheduled(blkMovingInfo.getTargetStorageType());
+    LOG.debug("Received BlockMovingTask {}", blkMovingInfo);
+    BlockMovingTask blockMovingTask = new BlockMovingTask(blkMovingInfo);
+    Future<BlockMovementAttemptFinished> moveCallable = mCompletionServ
+        .submit(blockMovingTask);
+    blkMovementTracker.addBlock(blkMovingInfo.getBlock(), moveCallable);
+  }
+
+  private class ExternalBlocksMovementsStatusHandler
+      extends BlocksMovementsStatusHandler {
+    @Override
+    public void handle(
+        List<BlockMovementAttemptFinished> moveAttemptFinishedBlks) {
+      List<Block> blocks = new ArrayList<>();
+      for (BlockMovementAttemptFinished item : moveAttemptFinishedBlks) {
+        blocks.add(item.getBlock());
+      }
+      BlocksStorageMoveAttemptFinished blkAttempted =
+          new BlocksStorageMoveAttemptFinished(
+          blocks.toArray(new Block[blocks.size()]));
+      service.notifyStorageMovementAttemptFinishedBlks(blkAttempted);
+    }
+  }
+
+  /**
+   * This class encapsulates the process of moving the block replica to the
+   * given target.
+   */
+  private class BlockMovingTask
+      implements Callable<BlockMovementAttemptFinished> {
+    private final BlockMovingInfo blkMovingInfo;
+
+    BlockMovingTask(BlockMovingInfo blkMovingInfo) {
+      this.blkMovingInfo = blkMovingInfo;
+    }
+
+    @Override
+    public BlockMovementAttemptFinished call() {
+      BlockMovementStatus blkMovementStatus = moveBlock();
+      return new BlockMovementAttemptFinished(blkMovingInfo.getBlock(),
+          blkMovingInfo.getSource(), blkMovingInfo.getTarget(),
+          blkMovementStatus);
+    }
+
+    private BlockMovementStatus moveBlock() {
+      ExtendedBlock eb = new ExtendedBlock(nnc.getBlockpoolID(),
+          blkMovingInfo.getBlock());
+
+      final KeyManager km = nnc.getKeyManager();
+      Token<BlockTokenIdentifier> accessToken;
+      try {
+        accessToken = km.getAccessToken(eb,
+            new StorageType[]{blkMovingInfo.getTargetStorageType()},
+            new String[0]);
+      } catch (IOException e) {
+        // TODO: handle failure retries
+        LOG.warn(
+            "Failed to move block:{} from src:{} to destin:{} to satisfy "
+                + "storageType:{}",
+            blkMovingInfo.getBlock(), blkMovingInfo.getSource(),
+            blkMovingInfo.getTarget(), blkMovingInfo.getTargetStorageType(), e);
+        return BlockMovementStatus.DN_BLK_STORAGE_MOVEMENT_FAILURE;
+      }
+      return blkDispatcher.moveBlock(blkMovingInfo, saslClient, eb,
+          new Socket(), km, accessToken);
+    }
+  }
+
+  /**
+   * Cleanup the resources.
+   */
+  void cleanUp() {
+    blkMovementTracker.stopTracking();
+    if (movementTrackerThread != null) {
+      movementTrackerThread.interrupt();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b0cb8d9b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestBlockStorageMovementAttemptedItems.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestBlockStorageMovementAttemptedItems.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestBlockStorageMovementAttemptedItems.java
index 3e2c324..4097339 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestBlockStorageMovementAttemptedItems.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestBlockStorageMovementAttemptedItems.java
@@ -56,7 +56,7 @@ public class TestBlockStorageMovementAttemptedItems {
     unsatisfiedStorageMovementFiles =
         new BlockStorageMovementNeeded(ctxt, null);
     bsmAttemptedItems = new BlockStorageMovementAttemptedItems(sps,
-        unsatisfiedStorageMovementFiles);
+        unsatisfiedStorageMovementFiles, null);
   }
 
   @After

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b0cb8d9b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java
index e0bf410..8115661 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java
@@ -92,7 +92,7 @@ public class TestStoragePolicySatisfier {
 
   private static final String ONE_SSD = "ONE_SSD";
   private static final String COLD = "COLD";
-  private static final Logger LOG =
+  protected static final Logger LOG =
       LoggerFactory.getLogger(TestStoragePolicySatisfier.class);
   private Configuration config = null;
   private StorageType[][] allDiskTypes =
@@ -1337,7 +1337,7 @@ public class TestStoragePolicySatisfier {
     };
 
     FileIdCollector fileIDCollector = createFileIdCollector(sps, ctxt);
-    sps.init(ctxt, fileIDCollector, null);
+    sps.init(ctxt, fileIDCollector, null, null);
     sps.getStorageMovementQueue().activate();
 
     INode rootINode = fsDir.getINode("/root");
@@ -1404,7 +1404,7 @@ public class TestStoragePolicySatisfier {
       }
     };
     FileIdCollector fileIDCollector = createFileIdCollector(sps, ctxt);
-    sps.init(ctxt, fileIDCollector, null);
+    sps.init(ctxt, fileIDCollector, null, null);
     sps.getStorageMovementQueue().activate();
 
     INode rootINode = fsDir.getINode("/root");

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b0cb8d9b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java
index 3ced34e..9a401bd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java
@@ -18,20 +18,33 @@
 package org.apache.hadoop.hdfs.server.sps;
 
 import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.namenode.sps.BlockMovementListener;
 import org.apache.hadoop.hdfs.server.namenode.sps.Context;
 import org.apache.hadoop.hdfs.server.namenode.sps.FileIdCollector;
-import org.apache.hadoop.hdfs.server.namenode.sps.IntraSPSNameNodeBlockMoveTaskHandler;
 import org.apache.hadoop.hdfs.server.namenode.sps.IntraSPSNameNodeContext;
 import org.apache.hadoop.hdfs.server.namenode.sps.SPSService;
 import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier;
 import org.apache.hadoop.hdfs.server.namenode.sps.TestStoragePolicySatisfier;
+import org.junit.Assert;
 import org.junit.Ignore;
 
+import com.google.common.collect.Maps;
+
 /**
  * Tests the external sps service plugins.
  */
@@ -69,23 +82,24 @@ public class TestExternalStoragePolicySatisfier
     cluster.waitActive();
     if (conf.getBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
         false)) {
-      SPSService spsService = cluster.getNameNode().getNamesystem()
-          .getBlockManager().getSPSService();
+      BlockManager blkMgr = cluster.getNameNode().getNamesystem()
+          .getBlockManager();
+      SPSService spsService = blkMgr.getSPSService();
       spsService.stopGracefully();
 
       IntraSPSNameNodeContext context = new IntraSPSNameNodeContext(
           cluster.getNameNode().getNamesystem(),
-          cluster.getNameNode().getNamesystem().getBlockManager(), cluster
-              .getNameNode().getNamesystem().getBlockManager().getSPSService());
-
+          blkMgr, blkMgr.getSPSService());
+      ExternalBlockMovementListener blkMoveListener =
+          new ExternalBlockMovementListener();
+      ExternalSPSBlockMoveTaskHandler externalHandler =
+          new ExternalSPSBlockMoveTaskHandler(conf, getNameNodeConnector(conf),
+              blkMgr.getSPSService());
+      externalHandler.init();
       spsService.init(context,
-          new ExternalSPSFileIDCollector(context,
-              cluster.getNameNode().getNamesystem().getBlockManager()
-                  .getSPSService(),
-              5),
-          new IntraSPSNameNodeBlockMoveTaskHandler(
-              cluster.getNameNode().getNamesystem().getBlockManager(),
-              cluster.getNameNode().getNamesystem()));
+          new ExternalSPSFileIDCollector(context, blkMgr.getSPSService(), 5),
+          externalHandler,
+          blkMoveListener);
       spsService.start(true);
     }
     return cluster;
@@ -97,6 +111,35 @@ public class TestExternalStoragePolicySatisfier
     return new ExternalSPSFileIDCollector(ctxt, sps, 5);
   }
 
+  private class ExternalBlockMovementListener implements BlockMovementListener {
+
+    private List<Block> actualBlockMovements = new ArrayList<>();
+
+    @Override
+    public void notifyMovementTriedBlocks(Block[] moveAttemptFinishedBlks) {
+      for (Block block : moveAttemptFinishedBlks) {
+        actualBlockMovements.add(block);
+      }
+      LOG.info("Movement attempted blocks", actualBlockMovements);
+    }
+  }
+
+  private NameNodeConnector getNameNodeConnector(Configuration conf)
+      throws IOException {
+    final Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
+    Assert.assertEquals(1, namenodes.size());
+    Map<URI, List<Path>> nnMap = Maps.newHashMap();
+    for (URI nn : namenodes) {
+      nnMap.put(nn, null);
+    }
+    final Path externalSPSPathId = new Path("/system/externalSPS.id");
+    final List<NameNodeConnector> nncs = NameNodeConnector
+        .newNameNodeConnectors(nnMap,
+            StoragePolicySatisfier.class.getSimpleName(), externalSPSPathId,
+            conf, NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS);
+    return nncs.get(0);
+  }
+
   /**
    * This test need not run as external scan is not a batch based scanning right
    * now.


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org