You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by um...@apache.org on 2014/11/26 19:00:53 UTC

hadoop git commit: HDFS-7310. Mover can give first priority to local DN if it has target storage type available in local DN. (Vinayakumar B via umamahesh)

Repository: hadoop
Updated Branches:
  refs/heads/trunk aa7dac335 -> 058af60c5


HDFS-7310. Mover can give first priority to local DN if it has target storage type available in local DN. (Vinayakumar B via umamahesh)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/058af60c
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/058af60c
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/058af60c

Branch: refs/heads/trunk
Commit: 058af60c56207907f2bedf76df4284e86d923e0c
Parents: aa7dac3
Author: Uma Maheswara Rao G <um...@apache.org>
Authored: Wed Nov 26 23:27:25 2014 +0530
Committer: Uma Maheswara Rao G <um...@apache.org>
Committed: Wed Nov 26 23:27:25 2014 +0530

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   3 +
 .../hadoop/hdfs/server/balancer/Dispatcher.java |  17 +--
 .../server/blockmanagement/BlockManager.java    |  18 ++-
 .../hdfs/server/blockmanagement/BlocksMap.java  |   6 +-
 .../blockmanagement/DatanodeStorageInfo.java    |  14 +-
 .../hdfs/server/datanode/DataXceiver.java       | 141 ++++++++++---------
 .../server/datanode/fsdataset/FsDatasetSpi.java |   7 +
 .../datanode/fsdataset/impl/BlockPoolSlice.java |   4 +
 .../datanode/fsdataset/impl/FsDatasetImpl.java  | 134 ++++++++++++++----
 .../datanode/fsdataset/impl/FsVolumeImpl.java   |   4 +
 .../impl/RamDiskAsyncLazyPersistService.java    |   2 +-
 .../apache/hadoop/hdfs/server/mover/Mover.java  |  29 +++-
 .../server/blockmanagement/TestBlockInfo.java   |   4 +-
 .../blockmanagement/TestDatanodeDescriptor.java |   8 +-
 .../server/datanode/SimulatedFSDataset.java     |   7 +
 .../server/datanode/TestBlockReplacement.java   | 101 ++++++++++---
 .../hadoop/hdfs/server/mover/TestMover.java     |  48 ++++++-
 17 files changed, 397 insertions(+), 150 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/058af60c/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 89e67eb..12219a0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -401,6 +401,9 @@ Release 2.7.0 - UNRELEASED
     HDFS-6803 Document DFSClient#DFSInputStream expectations reading and preading
     in concurrent context. (stack via stevel)
 
+    HDFS-7310. Mover can give first priority to local DN if it has target storage type 
+    available in local DN. (Vinayakumar B via umamahesh)
+
   OPTIMIZATIONS
 
   BUG FIXES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/058af60c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
index 6ede40a..63e151c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
@@ -243,6 +243,10 @@ public class Dispatcher {
      */
     private boolean chooseProxySource() {
       final DatanodeInfo targetDN = target.getDatanodeInfo();
+      // if source and target are same nodes then no need of proxy
+      if (source.getDatanodeInfo().equals(targetDN) && addTo(source)) {
+        return true;
+      }
       // if node group is supported, first try add nodes in the same node group
       if (cluster.isNodeGroupAware()) {
         for (StorageGroup loc : block.getLocations()) {
@@ -375,19 +379,6 @@ public class Dispatcher {
     public DBlock(Block block) {
       super(block);
     }
-
-    @Override
-    public synchronized boolean isLocatedOn(StorageGroup loc) {
-      // currently we only check if replicas are located on the same DataNodes
-      // since we do not have the capability to store two replicas in the same
-      // DataNode even though they are on two different storage types
-      for (StorageGroup existing : locations) {
-        if (existing.getDatanodeInfo().equals(loc.getDatanodeInfo())) {
-          return true;
-        }
-      }
-      return false;
-    }
   }
 
   /** The class represents a desired move. */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/058af60c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 254643c..2676696 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -54,7 +54,6 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.fs.FileEncryptionInfo;
-
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
@@ -63,6 +62,7 @@ import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager.Acces
 import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
 import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
 import org.apache.hadoop.hdfs.server.blockmanagement.CorruptReplicasMap.Reason;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo.AddBlockResult;
 import org.apache.hadoop.hdfs.server.blockmanagement.PendingDataNodeMessages.ReportedBlockInfo;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
@@ -2000,8 +2000,9 @@ public class BlockManager {
     // place a delimiter in the list which separates blocks 
     // that have been reported from those that have not
     BlockInfo delimiter = new BlockInfo(new Block(), (short) 1);
-    boolean added = storageInfo.addBlock(delimiter);
-    assert added : "Delimiting block cannot be present in the node";
+    AddBlockResult result = storageInfo.addBlock(delimiter);
+    assert result == AddBlockResult.ADDED 
+        : "Delimiting block cannot be present in the node";
     int headIndex = 0; //currently the delimiter is in the head of the list
     int curIndex;
 
@@ -2394,14 +2395,19 @@ public class BlockManager {
     assert bc != null : "Block must belong to a file";
 
     // add block to the datanode
-    boolean added = storageInfo.addBlock(storedBlock);
+    AddBlockResult result = storageInfo.addBlock(storedBlock);
 
     int curReplicaDelta;
-    if (added) {
+    if (result == AddBlockResult.ADDED) {
       curReplicaDelta = 1;
       if (logEveryBlock) {
         logAddStoredBlock(storedBlock, node);
       }
+    } else if (result == AddBlockResult.REPLACED) {
+      curReplicaDelta = 0;
+      blockLog.warn("BLOCK* addStoredBlock: " + "block " + storedBlock
+          + " moved to storageType " + storageInfo.getStorageType()
+          + " on node " + node);
     } else {
       // if the same block is added again and the replica was corrupt
       // previously because of a wrong gen stamp, remove it from the
@@ -2423,7 +2429,7 @@ public class BlockManager {
     if(storedBlock.getBlockUCState() == BlockUCState.COMMITTED &&
         numLiveReplicas >= minReplication) {
       storedBlock = completeBlock(bc, storedBlock, false);
-    } else if (storedBlock.isComplete() && added) {
+    } else if (storedBlock.isComplete() && result == AddBlockResult.ADDED) {
       // check whether safe replication is reached for the block
       // only complete blocks are counted towards that
       // Is no-op if not in safe mode.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/058af60c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java
index a675635..6664034 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.blockmanagement;
 import java.util.Iterator;
 
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo.AddBlockResult;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.util.GSet;
 import org.apache.hadoop.util.LightWeightGSet;
@@ -223,8 +224,9 @@ class BlocksMap {
       final boolean removed = storage.removeBlock(currentBlock);
       Preconditions.checkState(removed, "currentBlock not found.");
 
-      final boolean added = storage.addBlock(newBlock);
-      Preconditions.checkState(added, "newBlock already exists.");
+      final AddBlockResult result = storage.addBlock(newBlock);
+      Preconditions.checkState(result == AddBlockResult.ADDED,
+          "newBlock already exists.");
     }
     // replace block in the map itself
     blocks.put(newBlock);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/058af60c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
index 8c44b30..a3198e2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
@@ -215,10 +215,10 @@ public class DatanodeStorageInfo {
     return blockPoolUsed;
   }
 
-  public boolean addBlock(BlockInfo b) {
+  public AddBlockResult addBlock(BlockInfo b) {
     // First check whether the block belongs to a different storage
     // on the same DN.
-    boolean replaced = false;
+    AddBlockResult result = AddBlockResult.ADDED;
     DatanodeStorageInfo otherStorage =
         b.findStorageInfo(getDatanodeDescriptor());
 
@@ -226,10 +226,10 @@ public class DatanodeStorageInfo {
       if (otherStorage != this) {
         // The block belongs to a different storage. Remove it first.
         otherStorage.removeBlock(b);
-        replaced = true;
+        result = AddBlockResult.REPLACED;
       } else {
         // The block is already associated with this storage.
-        return false;
+        return AddBlockResult.ALREADY_EXIST;
       }
     }
 
@@ -237,7 +237,7 @@ public class DatanodeStorageInfo {
     b.addStorage(this);
     blockList = b.listInsert(blockList, this);
     numBlocks++;
-    return !replaced;
+    return result;
   }
 
   boolean removeBlock(BlockInfo b) {
@@ -358,4 +358,8 @@ public class DatanodeStorageInfo {
     }
     return null;
   }
+
+  static enum AddBlockResult {
+    ADDED, REPLACED, ALREADY_EXIST;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/058af60c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
index 61b9c67..bbc23e6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
@@ -1007,7 +1007,6 @@ class DataXceiver extends Receiver implements Runnable {
     updateCurrentThreadName("Replacing block " + block + " from " + delHint);
 
     /* read header */
-    block.setNumBytes(dataXceiverServer.estimateBlockSize);
     if (datanode.isBlockTokenEnabled) {
       try {
         datanode.blockPoolTokenSecretManager.checkAccess(blockToken, null, block,
@@ -1039,73 +1038,83 @@ class DataXceiver extends Receiver implements Runnable {
     DataOutputStream replyOut = new DataOutputStream(getOutputStream());
     boolean IoeDuringCopyBlockOperation = false;
     try {
-      // get the output stream to the proxy
-      final String dnAddr = proxySource.getXferAddr(connectToDnViaHostname);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Connecting to datanode " + dnAddr);
-      }
-      InetSocketAddress proxyAddr = NetUtils.createSocketAddr(dnAddr);
-      proxySock = datanode.newSocket();
-      NetUtils.connect(proxySock, proxyAddr, dnConf.socketTimeout);
-      proxySock.setSoTimeout(dnConf.socketTimeout);
-
-      OutputStream unbufProxyOut = NetUtils.getOutputStream(proxySock,
-          dnConf.socketWriteTimeout);
-      InputStream unbufProxyIn = NetUtils.getInputStream(proxySock);
-      DataEncryptionKeyFactory keyFactory =
-        datanode.getDataEncryptionKeyFactoryForBlock(block);
-      IOStreamPair saslStreams = datanode.saslClient.socketSend(proxySock,
-        unbufProxyOut, unbufProxyIn, keyFactory, blockToken, proxySource);
-      unbufProxyOut = saslStreams.out;
-      unbufProxyIn = saslStreams.in;
-      
-      proxyOut = new DataOutputStream(new BufferedOutputStream(unbufProxyOut, 
-          HdfsConstants.SMALL_BUFFER_SIZE));
-      proxyReply = new DataInputStream(new BufferedInputStream(unbufProxyIn,
-          HdfsConstants.IO_FILE_BUFFER_SIZE));
-
-      /* send request to the proxy */
-      IoeDuringCopyBlockOperation = true;
-      new Sender(proxyOut).copyBlock(block, blockToken);
-      IoeDuringCopyBlockOperation = false;
-
-      // receive the response from the proxy
-      
-      BlockOpResponseProto copyResponse = BlockOpResponseProto.parseFrom(
-          PBHelper.vintPrefixed(proxyReply));
-
-      if (copyResponse.getStatus() != SUCCESS) {
-        if (copyResponse.getStatus() == ERROR_ACCESS_TOKEN) {
+      // Move the block to different storage in the same datanode
+      if (proxySource.equals(datanode.getDatanodeId())) {
+        ReplicaInfo oldReplica = datanode.data.moveBlockAcrossStorage(block,
+            storageType);
+        if (oldReplica != null) {
+          LOG.info("Moved " + block + " from StorageType "
+              + oldReplica.getVolume().getStorageType() + " to " + storageType);
+        }
+      } else {
+        block.setNumBytes(dataXceiverServer.estimateBlockSize);
+        // get the output stream to the proxy
+        final String dnAddr = proxySource.getXferAddr(connectToDnViaHostname);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Connecting to datanode " + dnAddr);
+        }
+        InetSocketAddress proxyAddr = NetUtils.createSocketAddr(dnAddr);
+        proxySock = datanode.newSocket();
+        NetUtils.connect(proxySock, proxyAddr, dnConf.socketTimeout);
+        proxySock.setSoTimeout(dnConf.socketTimeout);
+        
+        OutputStream unbufProxyOut = NetUtils.getOutputStream(proxySock,
+            dnConf.socketWriteTimeout);
+        InputStream unbufProxyIn = NetUtils.getInputStream(proxySock);
+        DataEncryptionKeyFactory keyFactory =
+            datanode.getDataEncryptionKeyFactoryForBlock(block);
+        IOStreamPair saslStreams = datanode.saslClient.socketSend(proxySock,
+            unbufProxyOut, unbufProxyIn, keyFactory, blockToken, proxySource);
+        unbufProxyOut = saslStreams.out;
+        unbufProxyIn = saslStreams.in;
+        
+        proxyOut = new DataOutputStream(new BufferedOutputStream(unbufProxyOut, 
+            HdfsConstants.SMALL_BUFFER_SIZE));
+        proxyReply = new DataInputStream(new BufferedInputStream(unbufProxyIn,
+            HdfsConstants.IO_FILE_BUFFER_SIZE));
+        
+        /* send request to the proxy */
+        IoeDuringCopyBlockOperation = true;
+        new Sender(proxyOut).copyBlock(block, blockToken);
+        IoeDuringCopyBlockOperation = false;
+        
+        // receive the response from the proxy
+        
+        BlockOpResponseProto copyResponse = BlockOpResponseProto.parseFrom(
+            PBHelper.vintPrefixed(proxyReply));
+        
+        if (copyResponse.getStatus() != SUCCESS) {
+          if (copyResponse.getStatus() == ERROR_ACCESS_TOKEN) {
+            throw new IOException("Copy block " + block + " from "
+                + proxySock.getRemoteSocketAddress()
+                + " failed due to access token error");
+          }
           throw new IOException("Copy block " + block + " from "
-              + proxySock.getRemoteSocketAddress()
-              + " failed due to access token error");
+              + proxySock.getRemoteSocketAddress() + " failed");
         }
-        throw new IOException("Copy block " + block + " from "
-            + proxySock.getRemoteSocketAddress() + " failed");
+        
+        // get checksum info about the block we're copying
+        ReadOpChecksumInfoProto checksumInfo = copyResponse.getReadOpChecksumInfo();
+        DataChecksum remoteChecksum = DataTransferProtoUtil.fromProto(
+            checksumInfo.getChecksum());
+        // open a block receiver and check if the block does not exist
+        blockReceiver = new BlockReceiver(block, storageType,
+            proxyReply, proxySock.getRemoteSocketAddress().toString(),
+            proxySock.getLocalSocketAddress().toString(),
+            null, 0, 0, 0, "", null, datanode, remoteChecksum,
+            CachingStrategy.newDropBehind(), false);
+        
+        // receive a block
+        blockReceiver.receiveBlock(null, null, replyOut, null, 
+            dataXceiverServer.balanceThrottler, null, true);
+        
+        // notify name node
+        datanode.notifyNamenodeReceivedBlock(
+            block, delHint, blockReceiver.getStorageUuid());
+        
+        LOG.info("Moved " + block + " from " + peer.getRemoteAddressString()
+            + ", delHint=" + delHint);
       }
-      
-      // get checksum info about the block we're copying
-      ReadOpChecksumInfoProto checksumInfo = copyResponse.getReadOpChecksumInfo();
-      DataChecksum remoteChecksum = DataTransferProtoUtil.fromProto(
-          checksumInfo.getChecksum());
-      // open a block receiver and check if the block does not exist
-      blockReceiver = new BlockReceiver(block, storageType,
-          proxyReply, proxySock.getRemoteSocketAddress().toString(),
-          proxySock.getLocalSocketAddress().toString(),
-          null, 0, 0, 0, "", null, datanode, remoteChecksum,
-          CachingStrategy.newDropBehind(), false);
-
-      // receive a block
-      blockReceiver.receiveBlock(null, null, replyOut, null, 
-          dataXceiverServer.balanceThrottler, null, true);
-                    
-      // notify name node
-      datanode.notifyNamenodeReceivedBlock(
-          block, delHint, blockReceiver.getStorageUuid());
-
-      LOG.info("Moved " + block + " from " + peer.getRemoteAddressString()
-          + ", delHint=" + delHint);
-      
     } catch (IOException ioe) {
       opStatus = ERROR;
       errMsg = "opReplaceBlock " + block + " received exception " + ioe; 
@@ -1117,7 +1126,7 @@ class DataXceiver extends Receiver implements Runnable {
       throw ioe;
     } finally {
       // receive the last byte that indicates the proxy released its thread resource
-      if (opStatus == SUCCESS) {
+      if (opStatus == SUCCESS && proxyReply != null) {
         try {
           proxyReply.readChar();
         } catch (IOException ignored) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/058af60c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
index a02ee0a..462ad31 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.hdfs.server.datanode.DataStorage;
 import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
 import org.apache.hadoop.hdfs.server.datanode.Replica;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipelineInterface;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
 import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
 import org.apache.hadoop.hdfs.server.datanode.UnexpectedReplicaStateException;
@@ -508,4 +509,10 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
     * Callback from RamDiskAsyncLazyPersistService upon async lazy persist task fail
     */
    public void onFailLazyPersist(String bpId, long blockId);
+
+    /**
+     * Move block from one storage to another storage
+     */
+    public ReplicaInfo moveBlockAcrossStorage(final ExtendedBlock block,
+        StorageType targetStorageType) throws IOException;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/058af60c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
index 5c8709c..77cdb91 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
@@ -157,6 +157,10 @@ class BlockPoolSlice {
     return rbwDir;
   }
 
+  File getTmpDir() {
+    return tmpDir;
+  }
+
   /** Run DU on local drives.  It must be synchronized from caller. */
   void decDfsUsed(long value) {
     dfsUsage.decDfsUsed(value);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/058af60c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index 4a89778..2c6f409 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -663,13 +663,21 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
    * @return the new meta and block files.
    * @throws IOException
    */
-  static File[] copyBlockFiles(long blockId, long genStamp,
-                               File srcMeta, File srcFile, File destRoot)
+  static File[] copyBlockFiles(long blockId, long genStamp, File srcMeta,
+      File srcFile, File destRoot, boolean calculateChecksum)
       throws IOException {
     final File destDir = DatanodeUtil.idToBlockDir(destRoot, blockId);
     final File dstFile = new File(destDir, srcFile.getName());
     final File dstMeta = FsDatasetUtil.getMetaFile(dstFile, genStamp);
-    computeChecksum(srcMeta, dstMeta, srcFile);
+    if (calculateChecksum) {
+      computeChecksum(srcMeta, dstMeta, srcFile);
+    } else {
+      try {
+        Storage.nativeCopyFileUnbuffered(srcMeta, dstMeta, true);
+      } catch (IOException e) {
+        throw new IOException("Failed to copy " + srcMeta + " to " + dstMeta, e);
+      }
+    }
 
     try {
       Storage.nativeCopyFileUnbuffered(srcFile, dstFile, true);
@@ -677,14 +685,73 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       throw new IOException("Failed to copy " + srcFile + " to " + dstFile, e);
     }
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Copied " + srcMeta + " to " + dstMeta +
-          " and calculated checksum");
-      LOG.debug("Copied " + srcFile + " to " + dstFile);
+      if (calculateChecksum) {
+        LOG.debug("Copied " + srcMeta + " to " + dstMeta
+            + " and calculated checksum");
+      } else {
+        LOG.debug("Copied " + srcFile + " to " + dstFile);
+      }
     }
     return new File[] {dstMeta, dstFile};
   }
 
   /**
+   * Move block files from one storage to another storage.
+   * @return Returns the Old replicaInfo
+   * @throws IOException
+   */
+  @Override
+  public ReplicaInfo moveBlockAcrossStorage(ExtendedBlock block,
+      StorageType targetStorageType) throws IOException {
+    ReplicaInfo replicaInfo = getReplicaInfo(block);
+    if (replicaInfo.getState() != ReplicaState.FINALIZED) {
+      throw new ReplicaNotFoundException(
+          ReplicaNotFoundException.UNFINALIZED_REPLICA + block);
+    }
+    if (replicaInfo.getNumBytes() != block.getNumBytes()) {
+      throw new IOException("Corrupted replica " + replicaInfo
+          + " with a length of " + replicaInfo.getNumBytes()
+          + " expected length is " + block.getNumBytes());
+    }
+    if (replicaInfo.getVolume().getStorageType() == targetStorageType) {
+      throw new ReplicaAlreadyExistsException("Replica " + replicaInfo
+          + " already exists on storage " + targetStorageType);
+    }
+
+    if (replicaInfo.isOnTransientStorage()) {
+      // Block movement from RAM_DISK will be done by LazyPersist mechanism
+      throw new IOException("Replica " + replicaInfo
+          + " cannot be moved from storageType : "
+          + replicaInfo.getVolume().getStorageType());
+    }
+
+    FsVolumeImpl targetVolume = volumes.getNextVolume(targetStorageType,
+        block.getNumBytes());
+    File oldBlockFile = replicaInfo.getBlockFile();
+    File oldMetaFile = replicaInfo.getMetaFile();
+
+    // Copy files to temp dir first
+    File[] blockFiles = copyBlockFiles(block.getBlockId(),
+        block.getGenerationStamp(), oldMetaFile, oldBlockFile,
+        targetVolume.getTmpDir(block.getBlockPoolId()),
+        replicaInfo.isOnTransientStorage());
+
+    ReplicaInfo newReplicaInfo = new ReplicaInPipeline(
+        replicaInfo.getBlockId(), replicaInfo.getGenerationStamp(),
+        targetVolume, blockFiles[0].getParentFile(), 0);
+    newReplicaInfo.setNumBytes(blockFiles[1].length());
+    // Finalize the copied files
+    newReplicaInfo = finalizeReplica(block.getBlockPoolId(), newReplicaInfo);
+
+    removeOldReplica(replicaInfo, newReplicaInfo, oldBlockFile, oldMetaFile,
+        oldBlockFile.length(), oldMetaFile.length(), block.getBlockPoolId());
+
+    // Replace the old block if any to reschedule the scanning.
+    datanode.getBlockScanner().addBlock(block);
+    return replicaInfo;
+  }
+
+  /**
    * Compute and store the checksum for a block file that does not already have
    * its checksum computed.
    *
@@ -2442,6 +2509,35 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     }
   }
 
+  private void removeOldReplica(ReplicaInfo replicaInfo,
+      ReplicaInfo newReplicaInfo, File blockFile, File metaFile,
+      long blockFileUsed, long metaFileUsed, final String bpid) {
+    // Before deleting the files from old storage we must notify the
+    // NN that the files are on the new storage. Else a blockReport from
+    // the transient storage might cause the NN to think the blocks are lost.
+    // Replicas must be evicted from client short-circuit caches, because the
+    // storage will no longer be same, and thus will require validating
+    // checksum.  This also stops a client from holding file descriptors,
+    // which would prevent the OS from reclaiming the memory.
+    ExtendedBlock extendedBlock =
+        new ExtendedBlock(bpid, newReplicaInfo);
+    datanode.getShortCircuitRegistry().processBlockInvalidation(
+        ExtendedBlockId.fromExtendedBlock(extendedBlock));
+    datanode.notifyNamenodeReceivedBlock(
+        extendedBlock, null, newReplicaInfo.getStorageUuid());
+
+    // Remove the old replicas
+    if (blockFile.delete() || !blockFile.exists()) {
+      ((FsVolumeImpl) replicaInfo.getVolume()).decDfsUsed(bpid, blockFileUsed);
+      if (metaFile.delete() || !metaFile.exists()) {
+        ((FsVolumeImpl) replicaInfo.getVolume()).decDfsUsed(bpid, metaFileUsed);
+      }
+    }
+
+    // If deletion failed then the directory scanner will cleanup the blocks
+    // eventually.
+  }
+
   class LazyWriter implements Runnable {
     private volatile boolean shouldRun = true;
     final int checkpointerInterval;
@@ -2601,30 +2697,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
           }
         }
 
-        // Before deleting the files from transient storage we must notify the
-        // NN that the files are on the new storage. Else a blockReport from
-        // the transient storage might cause the NN to think the blocks are lost.
-        // Replicas must be evicted from client short-circuit caches, because the
-        // storage will no longer be transient, and thus will require validating
-        // checksum.  This also stops a client from holding file descriptors,
-        // which would prevent the OS from reclaiming the memory.
-        ExtendedBlock extendedBlock =
-            new ExtendedBlock(bpid, newReplicaInfo);
-        datanode.getShortCircuitRegistry().processBlockInvalidation(
-            ExtendedBlockId.fromExtendedBlock(extendedBlock));
-        datanode.notifyNamenodeReceivedBlock(
-            extendedBlock, null, newReplicaInfo.getStorageUuid());
-
-        // Remove the old replicas from transient storage.
-        if (blockFile.delete() || !blockFile.exists()) {
-          ((FsVolumeImpl) replicaInfo.getVolume()).decDfsUsed(bpid, blockFileUsed);
-          if (metaFile.delete() || !metaFile.exists()) {
-            ((FsVolumeImpl) replicaInfo.getVolume()).decDfsUsed(bpid, metaFileUsed);
-          }
-        }
-
-        // If deletion failed then the directory scanner will cleanup the blocks
-        // eventually.
+        removeOldReplica(replicaInfo, newReplicaInfo, blockFile, metaFile,
+            blockFileUsed, metaFileUsed, bpid);
       }
     }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/058af60c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
index 1d7540c..48427ad 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
@@ -129,6 +129,10 @@ public class FsVolumeImpl implements FsVolumeSpi {
     return getBlockPoolSlice(bpid).getLazypersistDir();
   }
 
+  File getTmpDir(String bpid) throws IOException {
+    return getBlockPoolSlice(bpid).getTmpDir();
+  }
+
   void decDfsUsed(String bpid, long value) {
     synchronized(dataset) {
       BlockPoolSlice bp = bpSlices.get(bpid);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/058af60c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java
index 5fdcc2f..c9aba8a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java
@@ -232,7 +232,7 @@ class RamDiskAsyncLazyPersistService {
       try {
         // No FsDatasetImpl lock for the file copy
         File targetFiles[] = FsDatasetImpl.copyBlockFiles(
-            blockId, genStamp, metaFile, blockFile, lazyPersistDir);
+            blockId, genStamp, metaFile, blockFile, lazyPersistDir, true);
 
         // Lock FsDataSetImpl during onCompleteLazyPersist callback
         datanode.getFSDataset().onCompleteLazyPersist(bpId, blockId,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/058af60c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
index 59814af..108eb38 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
@@ -86,8 +86,8 @@ public class Mover {
       return get(sources, ml);
     }
 
-    private StorageGroup getTarget(MLocation ml) {
-      return get(targets, ml);
+    private StorageGroup getTarget(String uuid, StorageType storageType) {
+      return targets.get(uuid, storageType);
     }
 
     private static <G extends StorageGroup> G get(StorageGroupMap<G> map, MLocation ml) {
@@ -387,6 +387,11 @@ public class Mover {
 
     boolean scheduleMoveReplica(DBlock db, Source source,
         List<StorageType> targetTypes) {
+      // Match storage on the same node
+      if (chooseTargetInSameNode(db, source, targetTypes)) {
+        return true;
+      }
+
       if (dispatcher.getCluster().isNodeGroupAware()) {
         if (chooseTarget(db, source, targetTypes, Matcher.SAME_NODE_GROUP)) {
           return true;
@@ -401,6 +406,26 @@ public class Mover {
       return chooseTarget(db, source, targetTypes, Matcher.ANY_OTHER);
     }
 
+    /**
+     * Choose the target storage within same Datanode if possible.
+     */
+    boolean chooseTargetInSameNode(DBlock db, Source source,
+        List<StorageType> targetTypes) {
+      for (StorageType t : targetTypes) {
+        StorageGroup target = storages.getTarget(source.getDatanodeInfo()
+            .getDatanodeUuid(), t);
+        if (target == null) {
+          continue;
+        }
+        final PendingMove pm = source.addPendingMove(db, target);
+        if (pm != null) {
+          dispatcher.executePendingMove(pm);
+          return true;
+        }
+      }
+      return false;
+    }
+
     boolean chooseTarget(DBlock db, Source source,
         List<StorageType> targetTypes, Matcher matcher) {
       final NetworkTopology cluster = dispatcher.getCluster(); 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/058af60c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java
index f8c583a..41c8f8a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java
@@ -29,6 +29,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo.AddBlockResult;
 import org.apache.hadoop.hdfs.server.common.GenerationStamp;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.junit.Assert;
@@ -75,7 +76,8 @@ public class TestBlockInfo {
     }
 
     // Try to move one of the blocks to a different storage.
-    boolean added = storage2.addBlock(blockInfos[NUM_BLOCKS/2]);
+    boolean added =
+        storage2.addBlock(blockInfos[NUM_BLOCKS / 2]) == AddBlockResult.ADDED;
     Assert.assertThat(added, is(false));
     Assert.assertThat(blockInfos[NUM_BLOCKS/2].getStorageInfo(0), is(storage2));
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/058af60c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeDescriptor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeDescriptor.java
index e00a4c3..fe639e4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeDescriptor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeDescriptor.java
@@ -25,6 +25,7 @@ import java.util.ArrayList;
 
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo.AddBlockResult;
 import org.apache.hadoop.hdfs.server.common.GenerationStamp;
 import org.junit.Test;
 
@@ -61,18 +62,17 @@ public class TestDatanodeDescriptor {
     BlockInfo blk1 = new BlockInfo(new Block(2L), (short) 2);
     DatanodeStorageInfo[] storages = dd.getStorageInfos();
     assertTrue(storages.length > 0);
-    final String storageID = storages[0].getStorageID();
     // add first block
-    assertTrue(storages[0].addBlock(blk));
+    assertTrue(storages[0].addBlock(blk) == AddBlockResult.ADDED);
     assertEquals(1, dd.numBlocks());
     // remove a non-existent block
     assertFalse(dd.removeBlock(blk1));
     assertEquals(1, dd.numBlocks());
     // add an existent block
-    assertFalse(storages[0].addBlock(blk));
+    assertFalse(storages[0].addBlock(blk) == AddBlockResult.ADDED);
     assertEquals(1, dd.numBlocks());
     // add second block
-    assertTrue(storages[0].addBlock(blk1));
+    assertTrue(storages[0].addBlock(blk1) == AddBlockResult.ADDED);
     assertEquals(2, dd.numBlocks());
     // remove first block
     assertTrue(dd.removeBlock(blk));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/058af60c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
index 3e5034a..e03b756 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
@@ -1260,5 +1260,12 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
   public void onFailLazyPersist(String bpId, long blockId) {
     throw new UnsupportedOperationException();
   }
+
+  @Override
+  public ReplicaInfo moveBlockAcrossStorage(ExtendedBlock block,
+      StorageType targetStorageType) throws IOException {
+    // TODO Auto-generated method stub
+    return null;
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/058af60c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java
index e0d7964..fbdfebf 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java
@@ -17,15 +17,14 @@
  */
 package org.apache.hadoop.hdfs.server.datanode;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
 
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.Socket;
+import java.net.SocketException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -40,6 +39,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.StorageType;
@@ -47,6 +47,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
@@ -200,7 +201,51 @@ public class TestBlockReplacement {
       cluster.shutdown();
     }
   }
-  
+
+  @Test
+  public void testBlockMoveAcrossStorageInSameNode() throws Exception {
+    final Configuration conf = new HdfsConfiguration();
+    // create only one datanode in the cluster to verify movement within
+    // datanode.
+    final MiniDFSCluster cluster =
+        new MiniDFSCluster.Builder(conf).numDataNodes(1).storageTypes(
+            new StorageType[] { StorageType.DISK, StorageType.ARCHIVE })
+            .build();
+    try {
+      cluster.waitActive();
+      final DistributedFileSystem dfs = cluster.getFileSystem();
+      final Path file = new Path("/testBlockMoveAcrossStorageInSameNode/file");
+      DFSTestUtil.createFile(dfs, file, 1024, (short) 1, 1024);
+      LocatedBlocks locatedBlocks = dfs.getClient().getLocatedBlocks(file.toString(), 0);
+      // get the current 
+      LocatedBlock locatedBlock = locatedBlocks.get(0);
+      ExtendedBlock block = locatedBlock.getBlock();
+      DatanodeInfo[] locations = locatedBlock.getLocations();
+      assertEquals(1, locations.length);
+      StorageType[] storageTypes = locatedBlock.getStorageTypes();
+      // current block should be written to DISK
+      assertTrue(storageTypes[0] == StorageType.DISK);
+      
+      DatanodeInfo source = locations[0];
+      // move block to ARCHIVE by using same DataNodeInfo for source, proxy and
+      // destination so that movement happens within datanode 
+      assertTrue(replaceBlock(block, source, source, source,
+          StorageType.ARCHIVE));
+      
+      // wait till namenode notified
+      Thread.sleep(3000);
+      locatedBlocks = dfs.getClient().getLocatedBlocks(file.toString(), 0);
+      // get the current 
+      locatedBlock = locatedBlocks.get(0);
+      assertEquals("Storage should be only one", 1,
+          locatedBlock.getLocations().length);
+      assertTrue("Block should be moved to ARCHIVE", locatedBlock
+          .getStorageTypes()[0] == StorageType.ARCHIVE);
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
   /* check if file's blocks have expected number of replicas,
    * and exist at all of includeNodes
    */
@@ -259,24 +304,42 @@ public class TestBlockReplacement {
    */
   private boolean replaceBlock( ExtendedBlock block, DatanodeInfo source,
       DatanodeInfo sourceProxy, DatanodeInfo destination) throws IOException {
+    return replaceBlock(block, source, sourceProxy, destination,
+        StorageType.DEFAULT);
+  }
+
+  /*
+   * Replace block
+   */
+  private boolean replaceBlock(
+      ExtendedBlock block,
+      DatanodeInfo source,
+      DatanodeInfo sourceProxy,
+      DatanodeInfo destination,
+      StorageType targetStorageType) throws IOException, SocketException {
     Socket sock = new Socket();
-    sock.connect(NetUtils.createSocketAddr(
-        destination.getXferAddr()), HdfsServerConstants.READ_TIMEOUT); 
-    sock.setKeepAlive(true);
-    // sendRequest
-    DataOutputStream out = new DataOutputStream(sock.getOutputStream());
-    new Sender(out).replaceBlock(block, StorageType.DEFAULT,
-        BlockTokenSecretManager.DUMMY_TOKEN,
-        source.getDatanodeUuid(), sourceProxy);
-    out.flush();
-    // receiveResponse
-    DataInputStream reply = new DataInputStream(sock.getInputStream());
+    try {
+      sock.connect(NetUtils.createSocketAddr(destination.getXferAddr()),
+          HdfsServerConstants.READ_TIMEOUT);
+      sock.setKeepAlive(true);
+      // sendRequest
+      DataOutputStream out = new DataOutputStream(sock.getOutputStream());
+      new Sender(out).replaceBlock(block, targetStorageType,
+          BlockTokenSecretManager.DUMMY_TOKEN, source.getDatanodeUuid(),
+          sourceProxy);
+      out.flush();
+      // receiveResponse
+      DataInputStream reply = new DataInputStream(sock.getInputStream());
 
-    BlockOpResponseProto proto = BlockOpResponseProto.parseDelimitedFrom(reply);
-    while (proto.getStatus() == Status.IN_PROGRESS) {
-      proto = BlockOpResponseProto.parseDelimitedFrom(reply);
+      BlockOpResponseProto proto =
+          BlockOpResponseProto.parseDelimitedFrom(reply);
+      while (proto.getStatus() == Status.IN_PROGRESS) {
+        proto = BlockOpResponseProto.parseDelimitedFrom(reply);
+      }
+      return proto.getStatus() == Status.SUCCESS;
+    } finally {
+      sock.close();
     }
-    return proto.getStatus() == Status.SUCCESS;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/058af60c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java
index 5866c7f..c9fc5ba 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java
@@ -31,7 +31,7 @@ import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector;
 import org.apache.hadoop.hdfs.server.mover.Mover.MLocation;
 import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
 import org.apache.hadoop.test.GenericTestUtils;
-import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.hadoop.util.ToolRunner;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -79,6 +79,52 @@ public class TestMover {
     }
   }
 
+  @Test
+  public void testScheduleBlockWithinSameNode() throws Exception {
+    final Configuration conf = new HdfsConfiguration();
+    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(3)
+        .storageTypes(
+            new StorageType[] { StorageType.DISK, StorageType.ARCHIVE })
+        .build();
+    try {
+      cluster.waitActive();
+      final DistributedFileSystem dfs = cluster.getFileSystem();
+      final String file = "/testScheduleWithinSameNode/file";
+      Path dir = new Path("/testScheduleWithinSameNode");
+      dfs.mkdirs(dir);
+      // write to DISK
+      dfs.setStoragePolicy(dir, "HOT");
+      {
+        final FSDataOutputStream out = dfs.create(new Path(file));
+        out.writeChars("testScheduleWithinSameNode");
+        out.close();
+      }
+
+      //verify before movement
+      LocatedBlock lb = dfs.getClient().getLocatedBlocks(file, 0).get(0);
+      StorageType[] storageTypes = lb.getStorageTypes();
+      for (StorageType storageType : storageTypes) {
+        Assert.assertTrue(StorageType.DISK == storageType);
+      }
+      // move to ARCHIVE
+      dfs.setStoragePolicy(dir, "COLD");
+      int rc = ToolRunner.run(conf, new Mover.Cli(),
+          new String[] { "-p", dir.toString() });
+      Assert.assertEquals("Movement to ARCHIVE should be successfull", 0, rc);
+
+      // Wait till namenode notified
+      Thread.sleep(3000);
+      lb = dfs.getClient().getLocatedBlocks(file, 0).get(0);
+      storageTypes = lb.getStorageTypes();
+      for (StorageType storageType : storageTypes) {
+        Assert.assertTrue(StorageType.ARCHIVE == storageType);
+      }
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
   private void checkMovePaths(List<Path> actual, Path... expected) {
     Assert.assertEquals(expected.length, actual.size());
     for (Path p : expected) {