You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sz...@apache.org on 2015/02/12 00:13:59 UTC

hadoop git commit: HDFS-6133. Add a feature for replica pinning so that a pinned replica will not be moved by Balancer/Mover. Contributed by zhaoyunjiong

Repository: hadoop
Updated Branches:
  refs/heads/trunk 50625e660 -> 085b1e293


HDFS-6133. Add a feature for replica pinning so that a pinned replica will not be moved by Balancer/Mover.  Contributed by zhaoyunjiong


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/085b1e29
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/085b1e29
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/085b1e29

Branch: refs/heads/trunk
Commit: 085b1e293ff53f7a86aa21406cfd4bfa0f3bf33b
Parents: 50625e6
Author: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Authored: Wed Feb 11 15:09:29 2015 -0800
Committer: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Committed: Wed Feb 11 15:12:12 2015 -0800

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |  3 +
 .../org/apache/hadoop/hdfs/DFSConfigKeys.java   |  4 ++
 .../org/apache/hadoop/hdfs/DFSOutputStream.java | 22 +++++-
 .../hadoop/hdfs/DistributedFileSystem.java      |  7 +-
 .../datatransfer/DataTransferProtocol.java      |  6 +-
 .../hdfs/protocol/datatransfer/Receiver.java    |  4 +-
 .../hdfs/protocol/datatransfer/Sender.java      |  8 ++-
 .../apache/hadoop/hdfs/protocolPB/PBHelper.java | 21 ++++++
 .../hdfs/server/datanode/BlockReceiver.java     | 12 +++-
 .../hadoop/hdfs/server/datanode/DataNode.java   |  2 +-
 .../hdfs/server/datanode/DataXceiver.java       | 34 +++++++---
 .../server/datanode/fsdataset/FsDatasetSpi.java | 13 ++++
 .../datanode/fsdataset/impl/FsDatasetImpl.java  | 40 +++++++++++
 .../src/main/proto/datatransfer.proto           |  3 +
 .../src/main/resources/hdfs-default.xml         |  6 ++
 .../org/apache/hadoop/hdfs/DFSTestUtil.java     | 27 ++++++--
 .../hadoop/hdfs/TestDataTransferProtocol.java   |  2 +-
 .../hdfs/server/balancer/TestBalancer.java      | 71 +++++++++++++++++---
 .../server/datanode/SimulatedFSDataset.java     | 11 +++
 .../hdfs/server/datanode/TestDiskError.java     |  2 +-
 .../extdataset/ExternalDatasetImpl.java         |  9 +++
 21 files changed, 271 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/085b1e29/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 9a6c551..5d0a6f3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -332,6 +332,9 @@ Release 2.7.0 - UNRELEASED
     HDFS-7720. Quota by Storage Type API, tools and ClientNameNode Protocol
     changes. (Xiaoyu Yao via Arpit Agarwal)
 
+    HDFS-6133. Add a feature for replica pinning so that a pinned replica
+    will not be moved by Balancer/Mover.  (zhaoyunjiong via szetszwo)
+
   IMPROVEMENTS
 
     HDFS-7055. Add tracing to DFSInputStream (cmccabe)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/085b1e29/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index e4343bb..975f023 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -778,4 +778,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   // 10 days
   public static final long DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_DEFAULT =
       TimeUnit.DAYS.toMillis(10);
+  public static final String DFS_DATANODE_BLOCK_PINNING_ENABLED = 
+    "dfs.datanode.block-pinning.enabled";
+  public static final boolean DFS_DATANODE_BLOCK_PINNING_ENABLED_DEFAULT =
+    false;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/085b1e29/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
index 9560e01..2bf5c40 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
@@ -1443,11 +1443,13 @@ public class DFSOutputStream extends FSOutputSummer
           ExtendedBlock blockCopy = new ExtendedBlock(block);
           blockCopy.setNumBytes(blockSize);
 
+          boolean[] targetPinnings = getPinnings(nodes);
           // send the request
           new Sender(out).writeBlock(blockCopy, nodeStorageTypes[0], accessToken,
               dfsClient.clientName, nodes, nodeStorageTypes, null, bcs, 
               nodes.length, block.getNumBytes(), bytesSent, newGS,
-              checksum4WriteBlock, cachingStrategy.get(), isLazyPersistFile);
+              checksum4WriteBlock, cachingStrategy.get(), isLazyPersistFile,
+            (targetPinnings == null ? false : targetPinnings[0]), targetPinnings);
   
           // receive ack for connect
           BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
@@ -1535,6 +1537,24 @@ public class DFSOutputStream extends FSOutputSummer
       }
     }
 
+    private boolean[] getPinnings(DatanodeInfo[] nodes) {
+      if (favoredNodes == null) {
+        return null;
+      } else {
+        boolean[] pinnings = new boolean[nodes.length];
+        for (int i = 0; i < nodes.length; i++) {
+          pinnings[i] = false;
+          for (int j = 0; j < favoredNodes.length; j++) {
+            if (nodes[i].getXferAddrWithHostname().equals(favoredNodes[j])) {
+              pinnings[i] = true;
+              break;
+            }
+          }
+        }
+        return pinnings;
+      }
+    }
+
     private LocatedBlock locateFollowingBlock(long start,
         DatanodeInfo[] excludedNodes)  throws IOException {
       int retries = dfsClient.getConf().nBlockWriteLocateFollowingRetry;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/085b1e29/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
index 63d48bc..f777817 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
@@ -352,9 +352,10 @@ public class DistributedFileSystem extends FileSystem {
    * Progressable)} with the addition of favoredNodes that is a hint to 
    * where the namenode should place the file blocks.
    * The favored nodes hint is not persisted in HDFS. Hence it may be honored
-   * at the creation time only. HDFS could move the blocks during balancing or
-   * replication, to move the blocks from favored nodes. A value of null means
-   * no favored nodes for this create
+   * at the creation time only. And with favored nodes, blocks will be pinned
+   * on the datanodes to prevent balancing move the block. HDFS could move the
+   * blocks during replication, to move the blocks from favored nodes. A value
+   * of null means no favored nodes for this create
    */
   public HdfsDataOutputStream create(final Path f,
       final FsPermission permission, final boolean overwrite,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/085b1e29/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
index f6b99e6..5fc263e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
@@ -92,6 +92,8 @@ public interface DataTransferProtocol {
    * @param minBytesRcvd minimum number of bytes received.
    * @param maxBytesRcvd maximum number of bytes received.
    * @param latestGenerationStamp the latest generation stamp of the block.
+   * @param pinning whether to pin the block, so Balancer won't move it.
+   * @param targetPinnings whether to pin the block on target datanode
    */
   public void writeBlock(final ExtendedBlock blk,
       final StorageType storageType, 
@@ -107,7 +109,9 @@ public interface DataTransferProtocol {
       final long latestGenerationStamp,
       final DataChecksum requestedChecksum,
       final CachingStrategy cachingStrategy,
-      final boolean allowLazyPersist) throws IOException;
+      final boolean allowLazyPersist,
+      final boolean pinning,
+      final boolean[] targetPinnings) throws IOException;
   /**
    * Transfer a block to another datanode.
    * The block stage must be

http://git-wip-us.apache.org/repos/asf/hadoop/blob/085b1e29/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
index 2ea1787..7994027 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
@@ -149,7 +149,9 @@ public abstract class Receiver implements DataTransferProtocol {
           (proto.hasCachingStrategy() ?
               getCachingStrategy(proto.getCachingStrategy()) :
             CachingStrategy.newDefaultStrategy()),
-          (proto.hasAllowLazyPersist() ? proto.getAllowLazyPersist() : false));
+          (proto.hasAllowLazyPersist() ? proto.getAllowLazyPersist() : false),
+          (proto.hasPinning() ? proto.getPinning(): false),
+          (PBHelper.convertBooleanList(proto.getTargetPinningsList())));
     } finally {
      if (traceScope != null) traceScope.close();
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/085b1e29/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
index 844c270..eb30afb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
@@ -129,7 +129,9 @@ public class Sender implements DataTransferProtocol {
       final long latestGenerationStamp,
       DataChecksum requestedChecksum,
       final CachingStrategy cachingStrategy,
-      final boolean allowLazyPersist) throws IOException {
+      final boolean allowLazyPersist,
+      final boolean pinning,
+      final boolean[] targetPinnings) throws IOException {
     ClientOperationHeaderProto header = DataTransferProtoUtil.buildClientHeader(
         blk, clientName, blockToken);
     
@@ -148,7 +150,9 @@ public class Sender implements DataTransferProtocol {
       .setLatestGenerationStamp(latestGenerationStamp)
       .setRequestedChecksum(checksumProto)
       .setCachingStrategy(getCachingStrategy(cachingStrategy))
-      .setAllowLazyPersist(allowLazyPersist);
+      .setAllowLazyPersist(allowLazyPersist)
+      .setPinning(pinning)
+      .addAllTargetPinnings(PBHelper.convert(targetPinnings, 1));
     
     if (source != null) {
       proto.setSource(PBHelper.convertDatanodeInfo(source));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/085b1e29/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
index e4746cc..8ecd767 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
@@ -2960,4 +2960,25 @@ public class PBHelper {
         ezKeyVersionName);
   }
 
+  public static List<Boolean> convert(boolean[] targetPinnings, int idx) {
+    List<Boolean> pinnings = new ArrayList<Boolean>();
+    if (targetPinnings == null) {
+      pinnings.add(Boolean.FALSE);
+    } else {
+      for (; idx < targetPinnings.length; ++idx) {
+        pinnings.add(Boolean.valueOf(targetPinnings[idx]));
+      }
+    }
+    return pinnings;
+  }
+
+  public static boolean[] convertBooleanList(
+    List<Boolean> targetPinningsList) {
+    final boolean[] targetPinnings = new boolean[targetPinningsList.size()];
+    for (int i = 0; i < targetPinningsList.size(); i++) {
+      targetPinnings[i] = targetPinningsList.get(i);
+    }
+    return targetPinnings;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/085b1e29/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
index 65b4f2d..368d80d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
@@ -132,6 +132,8 @@ class BlockReceiver implements Closeable {
   private long lastResponseTime = 0;
   private boolean isReplaceBlock = false;
   private DataOutputStream replyOut = null;
+  
+  private boolean pinning;
 
   BlockReceiver(final ExtendedBlock block, final StorageType storageType,
       final DataInputStream in,
@@ -141,7 +143,8 @@ class BlockReceiver implements Closeable {
       final String clientname, final DatanodeInfo srcDataNode,
       final DataNode datanode, DataChecksum requestedChecksum,
       CachingStrategy cachingStrategy,
-      final boolean allowLazyPersist) throws IOException {
+      final boolean allowLazyPersist,
+      final boolean pinning) throws IOException {
     try{
       this.block = block;
       this.in = in;
@@ -165,12 +168,14 @@ class BlockReceiver implements Closeable {
       this.isTransfer = stage == BlockConstructionStage.TRANSFER_RBW
           || stage == BlockConstructionStage.TRANSFER_FINALIZED;
 
+      this.pinning = pinning;
       if (LOG.isDebugEnabled()) {
         LOG.debug(getClass().getSimpleName() + ": " + block
             + "\n  isClient  =" + isClient + ", clientname=" + clientname
             + "\n  isDatanode=" + isDatanode + ", srcDataNode=" + srcDataNode
             + "\n  inAddr=" + inAddr + ", myAddr=" + myAddr
             + "\n  cachingStrategy = " + cachingStrategy
+            + "\n  pinning=" + pinning
             );
       }
 
@@ -1279,6 +1284,11 @@ class BlockReceiver implements Closeable {
           : 0;
       block.setNumBytes(replicaInfo.getNumBytes());
       datanode.data.finalizeBlock(block);
+      
+      if (pinning) {
+        datanode.data.setPinning(block);
+      }
+      
       datanode.closeBlock(
           block, DataNode.EMPTY_DEL_HINT, replicaInfo.getStorageUuid());
       if (ClientTraceLog.isInfoEnabled() && isClient) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/085b1e29/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index ed3ec7d..8d3b3a2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -2068,7 +2068,7 @@ public class DataNode extends ReconfigurableBase
         new Sender(out).writeBlock(b, targetStorageTypes[0], accessToken,
             clientname, targets, targetStorageTypes, srcNode,
             stage, 0, 0, 0, 0, blockSender.getChecksum(), cachingStrategy,
-            false);
+            false, false, null);
 
         // send data & checksum
         blockSender.sendBlock(out, unbufOut, null);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/085b1e29/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
index 3a2723f..bb5323a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
@@ -581,7 +581,9 @@ class DataXceiver extends Receiver implements Runnable {
       final long latestGenerationStamp,
       DataChecksum requestedChecksum,
       CachingStrategy cachingStrategy,
-      final boolean allowLazyPersist) throws IOException {
+      final boolean allowLazyPersist,
+      final boolean pinning,
+      final boolean[] targetPinnings) throws IOException {
     previousOpClientName = clientname;
     updateCurrentThreadName("Receiving block " + block);
     final boolean isDatanode = clientname.length() == 0;
@@ -594,14 +596,14 @@ class DataXceiver extends Receiver implements Runnable {
       throw new IOException(stage + " does not support multiple targets "
           + Arrays.asList(targets));
     }
-
+    
     if (LOG.isDebugEnabled()) {
       LOG.debug("opWriteBlock: stage=" + stage + ", clientname=" + clientname 
       		+ "\n  block  =" + block + ", newGs=" + latestGenerationStamp
       		+ ", bytesRcvd=[" + minBytesRcvd + ", " + maxBytesRcvd + "]"
           + "\n  targets=" + Arrays.asList(targets)
           + "; pipelineSize=" + pipelineSize + ", srcDataNode=" + srcDataNode
-          );
+          + ", pinning=" + pinning);
       LOG.debug("isDatanode=" + isDatanode
           + ", isClient=" + isClient
           + ", isTransfer=" + isTransfer);
@@ -643,7 +645,7 @@ class DataXceiver extends Receiver implements Runnable {
             peer.getLocalAddressString(),
             stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd,
             clientname, srcDataNode, datanode, requestedChecksum,
-            cachingStrategy, allowLazyPersist);
+            cachingStrategy, allowLazyPersist, pinning);
 
         storageUuid = blockReceiver.getStorageUuid();
       } else {
@@ -686,10 +688,19 @@ class DataXceiver extends Receiver implements Runnable {
           mirrorIn = new DataInputStream(unbufMirrorIn);
 
           // Do not propagate allowLazyPersist to downstream DataNodes.
-          new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0],
+          if (targetPinnings != null && targetPinnings.length > 0) {
+            new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0],
               blockToken, clientname, targets, targetStorageTypes, srcDataNode,
               stage, pipelineSize, minBytesRcvd, maxBytesRcvd,
-              latestGenerationStamp, requestedChecksum, cachingStrategy, false);
+              latestGenerationStamp, requestedChecksum, cachingStrategy,
+              false, targetPinnings[0], targetPinnings);
+          } else {
+            new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0],
+              blockToken, clientname, targets, targetStorageTypes, srcDataNode,
+              stage, pipelineSize, minBytesRcvd, maxBytesRcvd,
+              latestGenerationStamp, requestedChecksum, cachingStrategy,
+              false, false, targetPinnings);
+          }
 
           mirrorOut.flush();
 
@@ -949,7 +960,14 @@ class DataXceiver extends Receiver implements Runnable {
       }
 
     }
-
+    
+    if (datanode.data.getPinning(block)) {
+      String msg = "Not able to copy block " + block.getBlockId() + " " +
+          "to " + peer.getRemoteAddressString() + " because it's pinned ";
+      LOG.info(msg);
+      sendResponse(ERROR, msg);
+    }
+    
     if (!dataXceiverServer.balanceThrottler.acquire()) { // not able to start
       String msg = "Not able to copy block " + block.getBlockId() + " " +
           "to " + peer.getRemoteAddressString() + " because threads " +
@@ -1109,7 +1127,7 @@ class DataXceiver extends Receiver implements Runnable {
             proxyReply, proxySock.getRemoteSocketAddress().toString(),
             proxySock.getLocalSocketAddress().toString(),
             null, 0, 0, 0, "", null, datanode, remoteChecksum,
-            CachingStrategy.newDropBehind(), false);
+            CachingStrategy.newDropBehind(), false, false);
         
         // receive a block
         blockReceiver.receiveBlock(null, null, replyOut, null, 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/085b1e29/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
index 162e306..7d880d3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
@@ -522,4 +522,17 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
      */
    public ReplicaInfo moveBlockAcrossStorage(final ExtendedBlock block,
         StorageType targetStorageType) throws IOException;
+
+  /**
+   * Set a block to be pinned on this datanode so that it cannot be moved
+   * by Balancer/Mover.
+   *
+   * It is a no-op when dfs.datanode.block-pinning.enabled is set to false.
+   */
+  public void setPinning(ExtendedBlock block) throws IOException;
+
+  /**
+   * Check whether the block was pinned
+   */
+  public boolean getPinning(ExtendedBlock block) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/085b1e29/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index 9a981fe..8c8d744 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -50,6 +50,10 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.ExtendedBlockId;
@@ -239,6 +243,10 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   // Used for synchronizing access to usage stats
   private final Object statsLock = new Object();
 
+  final LocalFileSystem localFS;
+
+  private boolean blockPinningEnabled;
+  
   /**
    * An FSDataset has a directory where it loads its data files.
    */
@@ -299,6 +307,10 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     lazyWriter = new Daemon(new LazyWriter(conf));
     lazyWriter.start();
     registerMBean(datanode.getDatanodeUuid());
+    localFS = FileSystem.getLocal(conf);
+    blockPinningEnabled = conf.getBoolean(
+      DFSConfigKeys.DFS_DATANODE_BLOCK_PINNING_ENABLED,
+      DFSConfigKeys.DFS_DATANODE_BLOCK_PINNING_ENABLED_DEFAULT);
   }
 
   private void addVolume(Collection<StorageLocation> dataLocations,
@@ -2842,5 +2854,33 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       shouldRun = false;
     }
   }
+  
+  @Override
+  public void setPinning(ExtendedBlock block) throws IOException {
+    if (!blockPinningEnabled) {
+      return;
+    }
+
+    File f = getBlockFile(block);
+    Path p = new Path(f.getAbsolutePath());
+    
+    FsPermission oldPermission = localFS.getFileStatus(
+        new Path(f.getAbsolutePath())).getPermission();
+    //sticky bit is used for pinning purpose
+    FsPermission permission = new FsPermission(oldPermission.getUserAction(),
+        oldPermission.getGroupAction(), oldPermission.getOtherAction(), true);
+    localFS.setPermission(p, permission);
+  }
+  
+  @Override
+  public boolean getPinning(ExtendedBlock block) throws IOException {
+    if (!blockPinningEnabled) {
+      return  false;
+    }
+    File f = getBlockFile(block);
+        
+    FileStatus fss = localFS.getFileStatus(new Path(f.getAbsolutePath()));
+    return fss.getPermission().getStickyBit();
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/085b1e29/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
index 9512688..d72bb5e1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
@@ -123,6 +123,9 @@ message OpWriteBlockProto {
    * to ignore this hint.
    */
   optional bool allowLazyPersist = 13 [default = false];
+  //whether to pin the block, so Balancer won't move it.
+  optional bool pinning = 14 [default = false];
+  repeated bool targetPinnings = 15;
 }
   
 message OpTransferBlockProto {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/085b1e29/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 9697543..9299ea3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -2264,4 +2264,10 @@
   </description>
 </property>
 
+  <property>
+    <name>dfs.datanode.block-pinning.enabled</name>
+    <value>false</value>
+    <description>Whether pin blocks on favored DataNode.</description>
+  </property>
+
 </configuration>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/085b1e29/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
index f47bd24..c7fd2f8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
@@ -293,13 +293,21 @@ public class DFSTestUtil {
   public static void createFile(FileSystem fs, Path fileName, int bufferLen,
       long fileLen, long blockSize, short replFactor, long seed)
       throws IOException {
-    createFile(fs, fileName, false, bufferLen, fileLen, blockSize,
-            replFactor, seed, false);
+    createFile(fs, fileName, false, bufferLen, fileLen, blockSize, replFactor,
+      seed, false);
   }
 
   public static void createFile(FileSystem fs, Path fileName,
       boolean isLazyPersist, int bufferLen, long fileLen, long blockSize,
       short replFactor, long seed, boolean flush) throws IOException {
+        createFile(fs, fileName, isLazyPersist, bufferLen, fileLen, blockSize,
+          replFactor, seed, flush, null);
+  }
+
+  public static void createFile(FileSystem fs, Path fileName,
+      boolean isLazyPersist, int bufferLen, long fileLen, long blockSize,
+      short replFactor, long seed, boolean flush,
+      InetSocketAddress[] favoredNodes) throws IOException {
   assert bufferLen > 0;
   if (!fs.mkdirs(fileName.getParent())) {
       throw new IOException("Mkdirs failed to create " +
@@ -312,10 +320,19 @@ public class DFSTestUtil {
     createFlags.add(LAZY_PERSIST);
   }
   try {
-      out = fs.create(fileName, FsPermission.getFileDefault(), createFlags,
-        fs.getConf().getInt(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096),
+    if (favoredNodes == null) {
+      out = fs.create(
+        fileName,
+        FsPermission.getFileDefault(),
+        createFlags,
+        fs.getConf().getInt(
+          CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096),
         replFactor, blockSize, null);
-
+    } else {
+      out = ((DistributedFileSystem) fs).create(fileName,
+        FsPermission.getDefault(), true, bufferLen, replFactor, blockSize,
+        null, favoredNodes);
+    }
       if (fileLen > 0) {
         byte[] toWrite = new byte[bufferLen];
         Random rb = new Random(seed);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/085b1e29/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
index 55cd8e2..519c511 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
@@ -528,6 +528,6 @@ public class TestDataTransferProtocol {
         BlockTokenSecretManager.DUMMY_TOKEN, "cl",
         new DatanodeInfo[1], new StorageType[1], null, stage,
         0, block.getNumBytes(), block.getNumBytes(), newGS,
-        checksum, CachingStrategy.newDefaultStrategy(), false);
+        checksum, CachingStrategy.newDefaultStrategy(), false, false, null);
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/085b1e29/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
index 153baeb..03d2812 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
@@ -17,12 +17,7 @@
  */
 package org.apache.hadoop.hdfs.server.balancer;
 
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_RAM_DISK_LOW_WATERMARK_BYTES;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
 import static org.apache.hadoop.hdfs.StorageType.DEFAULT;
 import static org.apache.hadoop.hdfs.StorageType.RAM_DISK;
 import static org.junit.Assert.assertEquals;
@@ -33,6 +28,7 @@ import java.io.File;
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.net.URI;
+import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -59,12 +55,8 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.NameNodeProxies;
 import org.apache.hadoop.hdfs.StorageType;
-import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.ClientProtocol;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.*;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.server.balancer.Balancer.Cli;
 import org.apache.hadoop.hdfs.server.balancer.Balancer.Parameters;
 import org.apache.hadoop.hdfs.server.balancer.Balancer.Result;
@@ -310,6 +302,63 @@ public class TestBalancer {
   }
   
   /**
+   * Make sure that balancer can't move pinned blocks.
+   * If specified favoredNodes when create file, blocks will be pinned use 
+   * sticky bit.
+   * @throws Exception
+   */
+  @Test(timeout=100000)
+  public void testBalancerWithPinnedBlocks() throws Exception {
+    final Configuration conf = new HdfsConfiguration();
+    initConf(conf);
+    conf.setBoolean(DFS_DATANODE_BLOCK_PINNING_ENABLED, true);
+    
+    long[] capacities =  new long[] { CAPACITY, CAPACITY };
+    String[] racks = { RACK0, RACK1 };
+    int numOfDatanodes = capacities.length;
+
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(capacities.length)
+      .hosts(new String[]{"localhost", "localhost"})
+      .racks(racks).simulatedCapacities(capacities).build();
+
+    try {
+      cluster.waitActive();
+      client = NameNodeProxies.createProxy(conf,
+          cluster.getFileSystem(0).getUri(), ClientProtocol.class).getProxy();
+      
+      // fill up the cluster to be 80% full
+      long totalCapacity = sum(capacities);
+      long totalUsedSpace = totalCapacity * 8 / 10;
+      InetSocketAddress[] favoredNodes = new InetSocketAddress[numOfDatanodes];
+      for (int i = 0; i < favoredNodes.length; i++) {
+        favoredNodes[i] = cluster.getDataNodes().get(i).getXferAddress();
+      }
+
+      DFSTestUtil.createFile(cluster.getFileSystem(0), filePath, false, 1024,
+          totalUsedSpace / numOfDatanodes, DEFAULT_BLOCK_SIZE,
+          (short) numOfDatanodes, 0, false, favoredNodes);
+      
+      // start up an empty node with the same capacity
+      cluster.startDataNodes(conf, 1, true, null, new String[] { RACK2 },
+          new long[] { CAPACITY });
+      
+      totalCapacity += CAPACITY;
+      
+      // run balancer and validate results
+      waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster);
+
+      // start rebalancing
+      Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
+      int r = Balancer.run(namenodes, Balancer.Parameters.DEFAULT, conf);
+      assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r);
+      
+    } finally {
+      cluster.shutdown();
+    }
+    
+  }
+  
+  /**
    * Wait until balanced: each datanode gives utilization within 
    * BALANCE_ALLOWED_VARIANCE of average
    * @throws IOException

http://git-wip-us.apache.org/repos/asf/hadoop/blob/085b1e29/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
index 6ff4603..36b62e5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
@@ -127,6 +127,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
     SimulatedOutputStream oStream = null;
     private long bytesAcked;
     private long bytesRcvd;
+    private boolean pinned = false;
     BInfo(String bpid, Block b, boolean forWriting) throws IOException {
       theBlock = new Block(b);
       if (theBlock.getNumBytes() < 0) {
@@ -1285,5 +1286,15 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
     // TODO Auto-generated method stub
     return null;
   }
+  
+  @Override
+  public void setPinning(ExtendedBlock b) throws IOException {
+    blockMap.get(b.getBlockPoolId()).get(b.getLocalBlock()).pinned = true;
+  }
+  
+  @Override
+  public boolean getPinning(ExtendedBlock b) throws IOException {
+    return blockMap.get(b.getBlockPoolId()).get(b.getLocalBlock()).pinned;
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/085b1e29/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
index f440bb6..fb219d7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
@@ -152,7 +152,7 @@ public class TestDiskError {
         BlockTokenSecretManager.DUMMY_TOKEN, "",
         new DatanodeInfo[0], new StorageType[0], null,
         BlockConstructionStage.PIPELINE_SETUP_CREATE, 1, 0L, 0L, 0L,
-        checksum, CachingStrategy.newDefaultStrategy(), false);
+        checksum, CachingStrategy.newDefaultStrategy(), false, false, null);
     out.flush();
 
     // close the connection before sending the content of the block

http://git-wip-us.apache.org/repos/asf/hadoop/blob/085b1e29/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
index aa868d5..c377bdb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
@@ -399,4 +399,13 @@ public class ExternalDatasetImpl implements FsDatasetSpi<ExternalVolumeImpl> {
   public long getNumBlocksFailedToUncache() {
     return 0;
   }
+
+  @Override
+  public void setPinning(ExtendedBlock block) throws IOException {    
+  }
+
+  @Override
+  public boolean getPinning(ExtendedBlock block) throws IOException {
+    return false;
+  }
 }