You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by su...@apache.org on 2014/07/25 22:33:22 UTC

svn commit: r1613514 [2/6] - in /hadoop/common/branches/YARN-1051/hadoop-hdfs-project: hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/ hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/ hadoop-hdfs-nfs/src/test/java/org/apac...

Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java Fri Jul 25 20:33:09 2014
@@ -23,6 +23,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.StorageType;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
@@ -71,11 +72,20 @@ public interface DataTransferProtocol {
 
   /**
    * Write a block to a datanode pipeline.
-   * 
+   * The receiver datanode of this call is the next datanode in the pipeline.
+   * The other downstream datanodes are specified by the targets parameter.
+   * Note that the receiver {@link DatanodeInfo} is not required in the
+   * parameter list since the receiver datanode knows its info.  However, the
+   * {@link StorageType} for storing the replica in the receiver datanode is a 
+   * parameter since the receiver datanode may support multiple storage types.
+   *
    * @param blk the block being written.
+   * @param storageType for storing the replica in the receiver datanode.
    * @param blockToken security token for accessing the block.
    * @param clientName client's name.
-   * @param targets target datanodes in the pipeline.
+   * @param targets other downstream datanodes in the pipeline.
+   * @param targetStorageTypes target {@link StorageType}s corresponding
+   *                           to the target datanodes.
    * @param source source datanode.
    * @param stage pipeline stage.
    * @param pipelineSize the size of the pipeline.
@@ -84,9 +94,11 @@ public interface DataTransferProtocol {
    * @param latestGenerationStamp the latest generation stamp of the block.
    */
   public void writeBlock(final ExtendedBlock blk,
+      final StorageType storageType, 
       final Token<BlockTokenIdentifier> blockToken,
       final String clientName,
       final DatanodeInfo[] targets,
+      final StorageType[] targetStorageTypes, 
       final DatanodeInfo source,
       final BlockConstructionStage stage,
       final int pipelineSize,
@@ -110,7 +122,8 @@ public interface DataTransferProtocol {
   public void transferBlock(final ExtendedBlock blk,
       final Token<BlockTokenIdentifier> blockToken,
       final String clientName,
-      final DatanodeInfo[] targets) throws IOException;
+      final DatanodeInfo[] targets,
+      final StorageType[] targetStorageTypes) throws IOException;
 
   /**
    * Request short circuit access file descriptors from a DataNode.
@@ -148,11 +161,13 @@ public interface DataTransferProtocol {
    * It is used for balancing purpose.
    * 
    * @param blk the block being replaced.
+   * @param storageType the {@link StorageType} for storing the block.
    * @param blockToken security token for accessing the block.
    * @param delHint the hint for deleting the block in the original datanode.
    * @param source the source datanode for receiving the block.
    */
   public void replaceBlock(final ExtendedBlock blk,
+      final StorageType storageType, 
       final Token<BlockTokenIdentifier> blockToken,
       final String delHint,
       final DatanodeInfo source) throws IOException;

Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java Fri Jul 25 20:33:09 2014
@@ -27,7 +27,7 @@ import java.nio.channels.ReadableByteCha
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hdfs.util.DirectBufferPool;
+import org.apache.hadoop.util.DirectBufferPool;
 import org.apache.hadoop.io.IOUtils;
 
 import com.google.common.base.Preconditions;

Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java Fri Jul 25 20:33:09 2014
@@ -25,6 +25,7 @@ import java.io.IOException;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockProto;
@@ -121,10 +122,13 @@ public abstract class Receiver implement
   /** Receive OP_WRITE_BLOCK */
   private void opWriteBlock(DataInputStream in) throws IOException {
     final OpWriteBlockProto proto = OpWriteBlockProto.parseFrom(vintPrefixed(in));
+    final DatanodeInfo[] targets = PBHelper.convert(proto.getTargetsList());
     writeBlock(PBHelper.convert(proto.getHeader().getBaseHeader().getBlock()),
+        PBHelper.convertStorageType(proto.getStorageType()),
         PBHelper.convert(proto.getHeader().getBaseHeader().getToken()),
         proto.getHeader().getClientName(),
-        PBHelper.convert(proto.getTargetsList()),
+        targets,
+        PBHelper.convertStorageTypes(proto.getTargetStorageTypesList(), targets.length),
         PBHelper.convert(proto.getSource()),
         fromProto(proto.getStage()),
         proto.getPipelineSize(),
@@ -140,10 +144,12 @@ public abstract class Receiver implement
   private void opTransferBlock(DataInputStream in) throws IOException {
     final OpTransferBlockProto proto =
       OpTransferBlockProto.parseFrom(vintPrefixed(in));
+    final DatanodeInfo[] targets = PBHelper.convert(proto.getTargetsList());
     transferBlock(PBHelper.convert(proto.getHeader().getBaseHeader().getBlock()),
         PBHelper.convert(proto.getHeader().getBaseHeader().getToken()),
         proto.getHeader().getClientName(),
-        PBHelper.convert(proto.getTargetsList()));
+        targets,
+        PBHelper.convertStorageTypes(proto.getTargetStorageTypesList(), targets.length));
   }
 
   /** Receive {@link Op#REQUEST_SHORT_CIRCUIT_FDS} */
@@ -176,6 +182,7 @@ public abstract class Receiver implement
   private void opReplaceBlock(DataInputStream in) throws IOException {
     OpReplaceBlockProto proto = OpReplaceBlockProto.parseFrom(vintPrefixed(in));
     replaceBlock(PBHelper.convert(proto.getHeader().getBlock()),
+        PBHelper.convertStorageType(proto.getStorageType()),
         PBHelper.convert(proto.getHeader().getToken()),
         proto.getDelHint(),
         PBHelper.convert(proto.getSource()));

Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java Fri Jul 25 20:33:09 2014
@@ -25,6 +25,7 @@ import java.io.IOException;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.StorageType;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto;
@@ -111,9 +112,11 @@ public class Sender implements DataTrans
 
   @Override
   public void writeBlock(final ExtendedBlock blk,
+      final StorageType storageType, 
       final Token<BlockTokenIdentifier> blockToken,
       final String clientName,
       final DatanodeInfo[] targets,
+      final StorageType[] targetStorageTypes, 
       final DatanodeInfo source,
       final BlockConstructionStage stage,
       final int pipelineSize,
@@ -130,7 +133,9 @@ public class Sender implements DataTrans
 
     OpWriteBlockProto.Builder proto = OpWriteBlockProto.newBuilder()
       .setHeader(header)
+      .setStorageType(PBHelper.convertStorageType(storageType))
       .addAllTargets(PBHelper.convert(targets, 1))
+      .addAllTargetStorageTypes(PBHelper.convertStorageTypes(targetStorageTypes, 1))
       .setStage(toProto(stage))
       .setPipelineSize(pipelineSize)
       .setMinBytesRcvd(minBytesRcvd)
@@ -150,12 +155,14 @@ public class Sender implements DataTrans
   public void transferBlock(final ExtendedBlock blk,
       final Token<BlockTokenIdentifier> blockToken,
       final String clientName,
-      final DatanodeInfo[] targets) throws IOException {
+      final DatanodeInfo[] targets,
+      final StorageType[] targetStorageTypes) throws IOException {
     
     OpTransferBlockProto proto = OpTransferBlockProto.newBuilder()
       .setHeader(DataTransferProtoUtil.buildClientHeader(
           blk, clientName, blockToken))
       .addAllTargets(PBHelper.convert(targets))
+      .addAllTargetStorageTypes(PBHelper.convertStorageTypes(targetStorageTypes))
       .build();
 
     send(out, Op.TRANSFER_BLOCK, proto);
@@ -196,11 +203,13 @@ public class Sender implements DataTrans
   
   @Override
   public void replaceBlock(final ExtendedBlock blk,
+      final StorageType storageType, 
       final Token<BlockTokenIdentifier> blockToken,
       final String delHint,
       final DatanodeInfo source) throws IOException {
     OpReplaceBlockProto proto = OpReplaceBlockProto.newBuilder()
       .setHeader(DataTransferProtoUtil.buildBaseHeader(blk, blockToken))
+      .setStorageType(PBHelper.convertStorageType(storageType))
       .setDelHint(delHint)
       .setSource(PBHelper.convertDatanodeInfo(source))
       .build();

Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java Fri Jul 25 20:33:09 2014
@@ -97,7 +97,7 @@ public class DatanodeProtocolClientSideT
     RPC.setProtocolEngine(conf, DatanodeProtocolPB.class,
         ProtobufRpcEngine.class);
     UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
-    rpcProxy = createNamenodeWithRetry(createNamenode(nameNodeAddr, conf, ugi));
+    rpcProxy = createNamenode(nameNodeAddr, conf, ugi);
   }
 
   private static DatanodeProtocolPB createNamenode(
@@ -109,33 +109,6 @@ public class DatanodeProtocolClientSideT
         org.apache.hadoop.ipc.Client.getPingInterval(conf), null).getProxy();
   }
 
-  /** Create a {@link NameNode} proxy */
-  static DatanodeProtocolPB createNamenodeWithRetry(
-      DatanodeProtocolPB rpcNamenode) {
-    RetryPolicy createPolicy = RetryPolicies
-        .retryUpToMaximumCountWithFixedSleep(5,
-            HdfsConstants.LEASE_SOFTLIMIT_PERIOD, TimeUnit.MILLISECONDS);
-
-    Map<Class<? extends Exception>, RetryPolicy> remoteExceptionToPolicyMap = 
-        new HashMap<Class<? extends Exception>, RetryPolicy>();
-    remoteExceptionToPolicyMap.put(AlreadyBeingCreatedException.class,
-        createPolicy);
-
-    Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap =
-        new HashMap<Class<? extends Exception>, RetryPolicy>();
-    exceptionToPolicyMap.put(RemoteException.class, RetryPolicies
-        .retryByRemoteException(RetryPolicies.TRY_ONCE_THEN_FAIL,
-            remoteExceptionToPolicyMap));
-    RetryPolicy methodPolicy = RetryPolicies.retryByException(
-        RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
-    Map<String, RetryPolicy> methodNameToPolicyMap = new HashMap<String, RetryPolicy>();
-
-    methodNameToPolicyMap.put("create", methodPolicy);
-
-    return (DatanodeProtocolPB) RetryProxy.create(DatanodeProtocolPB.class,
-        rpcNamenode, methodNameToPolicyMap);
-  }
-
   @Override
   public void close() throws IOException {
     RPC.stopProxy(rpcProxy);

Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java Fri Jul 25 20:33:09 2014
@@ -47,6 +47,7 @@ import org.apache.hadoop.hdfs.server.pro
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
 import org.apache.hadoop.ipc.ProtobufHelper;
 import org.apache.hadoop.ipc.ProtocolMetaInterface;
+import org.apache.hadoop.ipc.ProtocolTranslator;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RpcClientUtil;
 
@@ -61,7 +62,7 @@ import com.google.protobuf.ServiceExcept
 @InterfaceAudience.Private
 @InterfaceStability.Stable
 public class NamenodeProtocolTranslatorPB implements NamenodeProtocol,
-    ProtocolMetaInterface, Closeable {
+    ProtocolMetaInterface, Closeable, ProtocolTranslator {
   /** RpcController is not used and hence is set to null */
   private final static RpcController NULL_CONTROLLER = null;
   
@@ -89,6 +90,11 @@ public class NamenodeProtocolTranslatorP
   }
 
   @Override
+  public Object getUnderlyingProxyObject() {
+    return rpcProxy;
+  }
+
+  @Override
   public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size)
       throws IOException {
     GetBlocksRequestProto req = GetBlocksRequestProto.newBuilder()

Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java Fri Jul 25 20:33:09 2014
@@ -150,6 +150,7 @@ import org.apache.hadoop.hdfs.protocol.p
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.SnapshottableDirectoryStatusProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageInfoProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageTypeProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageTypesProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageUuidsProto;
 import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalInfoProto;
 import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.GetXAttrsResponseProto;
@@ -674,14 +675,8 @@ public class PBHelper {
       targets[i] = PBHelper.convert(locs.get(i));
     }
 
-    final int storageTypesCount = proto.getStorageTypesCount();
-    final StorageType[] storageTypes;
-    if (storageTypesCount == 0) {
-      storageTypes = null;
-    } else {
-      Preconditions.checkState(storageTypesCount == locs.size());
-      storageTypes = convertStorageTypeProtos(proto.getStorageTypesList());
-    }
+    final StorageType[] storageTypes = convertStorageTypes(
+        proto.getStorageTypesList(), locs.size());
 
     final int storageIDsCount = proto.getStorageIDsCount();
     final String[] storageIDs;
@@ -969,6 +964,20 @@ public class PBHelper {
       targets[i] = PBHelper.convert(targetList.get(i));
     }
 
+    StorageType[][] targetStorageTypes = new StorageType[targetList.size()][];
+    List<StorageTypesProto> targetStorageTypesList = blkCmd.getTargetStorageTypesList();
+    if (targetStorageTypesList.isEmpty()) { // missing storage types
+      for(int i = 0; i < targetStorageTypes.length; i++) {
+        targetStorageTypes[i] = new StorageType[targets[i].length];
+        Arrays.fill(targetStorageTypes[i], StorageType.DEFAULT);
+      }
+    } else {
+      for(int i = 0; i < targetStorageTypes.length; i++) {
+        List<StorageTypeProto> p = targetStorageTypesList.get(i).getStorageTypesList();
+        targetStorageTypes[i] = p.toArray(new StorageType[p.size()]);
+      }
+    }
+
     List<StorageUuidsProto> targetStorageUuidsList = blkCmd.getTargetStorageUuidsList();
     String[][] targetStorageIDs = new String[targetStorageUuidsList.size()][];
     for(int i = 0; i < targetStorageIDs.length; i++) {
@@ -991,7 +1000,7 @@ public class PBHelper {
       throw new AssertionError("Unknown action type: " + blkCmd.getAction());
     }
     return new BlockCommand(action, blkCmd.getBlockPoolId(), blocks, targets,
-        targetStorageIDs);
+        targetStorageTypes, targetStorageIDs);
   }
 
   public static BlockIdCommand convert(BlockIdCommandProto blkIdCmd) {
@@ -1605,8 +1614,25 @@ public class PBHelper {
     }
   }
 
-  private static StorageTypeProto convertStorageType(
-      StorageType type) {
+  public static List<StorageTypeProto> convertStorageTypes(
+      StorageType[] types) {
+    return convertStorageTypes(types, 0);
+  }
+
+  public static List<StorageTypeProto> convertStorageTypes(
+      StorageType[] types, int startIdx) {
+    if (types == null) {
+      return null;
+    }
+    final List<StorageTypeProto> protos = new ArrayList<StorageTypeProto>(
+        types.length);
+    for (int i = startIdx; i < types.length; ++i) {
+      protos.add(convertStorageType(types[i]));
+    }
+    return protos; 
+  }
+
+  public static StorageTypeProto convertStorageType(StorageType type) {
     switch(type) {
     case DISK:
       return StorageTypeProto.DISK;
@@ -1621,7 +1647,7 @@ public class PBHelper {
   public static DatanodeStorage convert(DatanodeStorageProto s) {
     return new DatanodeStorage(s.getStorageUuid(),
                                PBHelper.convertState(s.getState()),
-                               PBHelper.convertType(s.getStorageType()));
+                               PBHelper.convertStorageType(s.getStorageType()));
   }
 
   private static State convertState(StorageState state) {
@@ -1634,7 +1660,7 @@ public class PBHelper {
     }
   }
 
-  private static StorageType convertType(StorageTypeProto type) {
+  public static StorageType convertStorageType(StorageTypeProto type) {
     switch(type) {
       case DISK:
         return StorageType.DISK;
@@ -1646,11 +1672,16 @@ public class PBHelper {
     }
   }
 
-  private static StorageType[] convertStorageTypeProtos(
-      List<StorageTypeProto> storageTypesList) {
-    final StorageType[] storageTypes = new StorageType[storageTypesList.size()];
-    for (int i = 0; i < storageTypes.length; ++i) {
-      storageTypes[i] = PBHelper.convertType(storageTypesList.get(i));
+  public static StorageType[] convertStorageTypes(
+      List<StorageTypeProto> storageTypesList, int expectedSize) {
+    final StorageType[] storageTypes = new StorageType[expectedSize];
+    if (storageTypesList.size() != expectedSize) { // missing storage types
+      Preconditions.checkState(storageTypesList.isEmpty());
+      Arrays.fill(storageTypes, StorageType.DEFAULT);
+    } else {
+      for (int i = 0; i < storageTypes.length; ++i) {
+        storageTypes[i] = convertStorageType(storageTypesList.get(i));
+      }
     }
     return storageTypes;
   }

Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java Fri Jul 25 20:33:09 2014
@@ -18,6 +18,8 @@
 package org.apache.hadoop.hdfs.server.balancer;
 
 import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT;
+import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY;
 import static org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed;
 
 import java.io.BufferedInputStream;
@@ -57,14 +59,17 @@ import org.apache.hadoop.conf.Configured
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.StorageType;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
-import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor;
 import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
 import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
+import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
@@ -202,6 +207,7 @@ public class Balancer {
   
   private final NameNodeConnector nnc;
   private final BalancingPolicy policy;
+  private final SaslDataTransferClient saslClient;
   private final double threshold;
   
   // all data node lists
@@ -352,19 +358,18 @@ public class Balancer {
         
         OutputStream unbufOut = sock.getOutputStream();
         InputStream unbufIn = sock.getInputStream();
-        if (nnc.getDataEncryptionKey() != null) {
-          IOStreamPair encryptedStreams =
-              DataTransferEncryptor.getEncryptedStreams(
-                  unbufOut, unbufIn, nnc.getDataEncryptionKey());
-          unbufOut = encryptedStreams.out;
-          unbufIn = encryptedStreams.in;
-        }
+        ExtendedBlock eb = new ExtendedBlock(nnc.blockpoolID, block.getBlock());
+        Token<BlockTokenIdentifier> accessToken = nnc.getAccessToken(eb);
+        IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut,
+          unbufIn, nnc, accessToken, target.datanode);
+        unbufOut = saslStreams.out;
+        unbufIn = saslStreams.in;
         out = new DataOutputStream(new BufferedOutputStream(unbufOut,
             HdfsConstants.IO_FILE_BUFFER_SIZE));
         in = new DataInputStream(new BufferedInputStream(unbufIn,
             HdfsConstants.IO_FILE_BUFFER_SIZE));
         
-        sendRequest(out);
+        sendRequest(out, eb, StorageType.DEFAULT, accessToken);
         receiveResponse(in);
         bytesMoved.addAndGet(block.getNumBytes());
         LOG.info("Successfully moved " + this);
@@ -395,10 +400,10 @@ public class Balancer {
     }
     
     /* Send a block replace request to the output stream*/
-    private void sendRequest(DataOutputStream out) throws IOException {
-      final ExtendedBlock eb = new ExtendedBlock(nnc.blockpoolID, block.getBlock());
-      final Token<BlockTokenIdentifier> accessToken = nnc.getAccessToken(eb);
-      new Sender(out).replaceBlock(eb, accessToken,
+    private void sendRequest(DataOutputStream out, ExtendedBlock eb,
+        StorageType storageType, 
+        Token<BlockTokenIdentifier> accessToken) throws IOException {
+      new Sender(out).replaceBlock(eb, storageType, accessToken,
           source.getStorageID(), proxySource.getDatanode());
     }
     
@@ -876,6 +881,12 @@ public class Balancer {
     this.maxConcurrentMovesPerNode =
         conf.getInt(DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
         DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT);
+    this.saslClient = new SaslDataTransferClient(
+      DataTransferSaslUtil.getSaslPropertiesResolver(conf),
+      TrustedChannelResolver.getInstance(conf),
+      conf.getBoolean(
+        IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
+        IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT));
   }
   
   /* Given a data node set, build a network topology and decide

Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java Fri Jul 25 20:33:09 2014
@@ -34,8 +34,8 @@ import org.apache.hadoop.hdfs.NameNodePr
 import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
 import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
-import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
 import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
@@ -50,7 +50,7 @@ import org.apache.hadoop.util.Daemon;
  * The class provides utilities for {@link Balancer} to access a NameNode
  */
 @InterfaceAudience.Private
-class NameNodeConnector {
+class NameNodeConnector implements DataEncryptionKeyFactory {
   private static final Log LOG = Balancer.LOG;
   private static final Path BALANCER_ID_PATH = new Path("/system/balancer.id");
   private static final int MAX_NOT_CHANGED_ITERATIONS = 5;
@@ -72,7 +72,6 @@ class NameNodeConnector {
   private BlockTokenSecretManager blockTokenSecretManager;
   private Daemon keyupdaterthread; // AccessKeyUpdater thread
   private DataEncryptionKey encryptionKey;
-  private final TrustedChannelResolver trustedChannelResolver;
 
   NameNodeConnector(URI nameNodeUri,
       Configuration conf) throws IOException {
@@ -122,7 +121,6 @@ class NameNodeConnector {
     if (out == null) {
       throw new IOException("Another balancer is running");
     }
-    this.trustedChannelResolver = TrustedChannelResolver.getInstance(conf);
   }
 
   boolean shouldContinue(long dispatchBlockMoveBytes) {
@@ -154,10 +152,10 @@ class NameNodeConnector {
           BlockTokenSecretManager.AccessMode.COPY));
     }
   }
-  
-  DataEncryptionKey getDataEncryptionKey()
-      throws IOException {
-    if (encryptDataTransfer && !this.trustedChannelResolver.isTrusted()) {
+
+  @Override
+  public DataEncryptionKey newDataEncryptionKey() {
+    if (encryptDataTransfer) {
       synchronized (this) {
         if (encryptionKey == null) {
           encryptionKey = blockTokenSecretManager.generateDataEncryptionKey();

Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java Fri Jul 25 20:33:09 2014
@@ -725,7 +725,6 @@ public class BlockManager {
     final List<DatanodeStorageInfo> locations
         = new ArrayList<DatanodeStorageInfo>(blocksMap.numNodes(block));
     for(DatanodeStorageInfo storage : blocksMap.getStorages(block)) {
-      final String storageID = storage.getStorageID();
       // filter invalidate replicas
       if(!invalidateBlocks.contains(storage.getDatanodeDescriptor(), block)) {
         locations.add(storage);
@@ -2637,7 +2636,7 @@ public class BlockManager {
     if (addedNode == delNodeHint) {
       delNodeHint = null;
     }
-    Collection<DatanodeDescriptor> nonExcess = new ArrayList<DatanodeDescriptor>();
+    Collection<DatanodeStorageInfo> nonExcess = new ArrayList<DatanodeStorageInfo>();
     Collection<DatanodeDescriptor> corruptNodes = corruptReplicas
         .getNodes(block);
     for(DatanodeStorageInfo storage : blocksMap.getStorages(block, State.NORMAL)) {
@@ -2657,7 +2656,7 @@ public class BlockManager {
         if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) {
           // exclude corrupt replicas
           if (corruptNodes == null || !corruptNodes.contains(cur)) {
-            nonExcess.add(cur);
+            nonExcess.add(storage);
           }
         }
       }
@@ -2681,7 +2680,7 @@ public class BlockManager {
    * If no such a node is available,
    * then pick a node with least free space
    */
-  private void chooseExcessReplicates(Collection<DatanodeDescriptor> nonExcess, 
+  private void chooseExcessReplicates(final Collection<DatanodeStorageInfo> nonExcess, 
                               Block b, short replication,
                               DatanodeDescriptor addedNode,
                               DatanodeDescriptor delNodeHint,
@@ -2689,28 +2688,33 @@ public class BlockManager {
     assert namesystem.hasWriteLock();
     // first form a rack to datanodes map and
     BlockCollection bc = getBlockCollection(b);
-    final Map<String, List<DatanodeDescriptor>> rackMap
-        = new HashMap<String, List<DatanodeDescriptor>>();
-    final List<DatanodeDescriptor> moreThanOne = new ArrayList<DatanodeDescriptor>();
-    final List<DatanodeDescriptor> exactlyOne = new ArrayList<DatanodeDescriptor>();
+
+    final Map<String, List<DatanodeStorageInfo>> rackMap
+        = new HashMap<String, List<DatanodeStorageInfo>>();
+    final List<DatanodeStorageInfo> moreThanOne = new ArrayList<DatanodeStorageInfo>();
+    final List<DatanodeStorageInfo> exactlyOne = new ArrayList<DatanodeStorageInfo>();
     
     // split nodes into two sets
     // moreThanOne contains nodes on rack with more than one replica
     // exactlyOne contains the remaining nodes
-    replicator.splitNodesWithRack(nonExcess, rackMap, moreThanOne,
-        exactlyOne);
+    replicator.splitNodesWithRack(nonExcess, rackMap, moreThanOne, exactlyOne);
     
     // pick one node to delete that favors the delete hint
     // otherwise pick one with least space from priSet if it is not empty
     // otherwise one node with least space from remains
     boolean firstOne = true;
+    final DatanodeStorageInfo delNodeHintStorage
+        = DatanodeStorageInfo.getDatanodeStorageInfo(nonExcess, delNodeHint);
+    final DatanodeStorageInfo addedNodeStorage
+        = DatanodeStorageInfo.getDatanodeStorageInfo(nonExcess, addedNode);
     while (nonExcess.size() - replication > 0) {
       // check if we can delete delNodeHint
-      final DatanodeInfo cur;
-      if (firstOne && delNodeHint !=null && nonExcess.contains(delNodeHint)
-          && (moreThanOne.contains(delNodeHint)
-              || (addedNode != null && !moreThanOne.contains(addedNode))) ) {
-        cur = delNodeHint;
+      final DatanodeStorageInfo cur;
+      if (firstOne && delNodeHintStorage != null
+          && (moreThanOne.contains(delNodeHintStorage)
+              || (addedNodeStorage != null
+                  && !moreThanOne.contains(addedNodeStorage)))) {
+        cur = delNodeHintStorage;
       } else { // regular excessive replica removal
         cur = replicator.chooseReplicaToDelete(bc, b, replication,
         		moreThanOne, exactlyOne);
@@ -2722,7 +2726,7 @@ public class BlockManager {
           exactlyOne, cur);
 
       nonExcess.remove(cur);
-      addToExcessReplicate(cur, b);
+      addToExcessReplicate(cur.getDatanodeDescriptor(), b);
 
       //
       // The 'excessblocks' tracks blocks until we get confirmation
@@ -2733,7 +2737,7 @@ public class BlockManager {
       // should be deleted.  Items are removed from the invalidate list
       // upon giving instructions to the namenode.
       //
-      addToInvalidates(b, cur);
+      addToInvalidates(b, cur.getDatanodeDescriptor());
       blockLog.info("BLOCK* chooseExcessReplicates: "
                 +"("+cur+", "+b+") is added to invalidated blocks set");
     }

Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java Fri Jul 25 20:33:09 2014
@@ -124,11 +124,12 @@ public abstract class BlockPlacementPoli
                    listed in the previous parameter.
    * @return the replica that is the best candidate for deletion
    */
-  abstract public DatanodeDescriptor chooseReplicaToDelete(BlockCollection srcBC,
-                                      Block block, 
-                                      short replicationFactor,
-                                      Collection<DatanodeDescriptor> existingReplicas,
-                                      Collection<DatanodeDescriptor> moreExistingReplicas);
+  abstract public DatanodeStorageInfo chooseReplicaToDelete(
+      BlockCollection srcBC,
+      Block block, 
+      short replicationFactor,
+      Collection<DatanodeStorageInfo> existingReplicas,
+      Collection<DatanodeStorageInfo> moreExistingReplicas);
 
   /**
    * Used to setup a BlockPlacementPolicy object. This should be defined by 
@@ -175,21 +176,23 @@ public abstract class BlockPlacementPoli
    * @param exactlyOne The List of replica nodes on rack with only one replica
    * @param cur current replica to remove
    */
-  public void adjustSetsWithChosenReplica(final Map<String, 
-      List<DatanodeDescriptor>> rackMap,
-      final List<DatanodeDescriptor> moreThanOne,
-      final List<DatanodeDescriptor> exactlyOne, final DatanodeInfo cur) {
+  public void adjustSetsWithChosenReplica(
+      final Map<String, List<DatanodeStorageInfo>> rackMap,
+      final List<DatanodeStorageInfo> moreThanOne,
+      final List<DatanodeStorageInfo> exactlyOne,
+      final DatanodeStorageInfo cur) {
     
-    String rack = getRack(cur);
-    final List<DatanodeDescriptor> datanodes = rackMap.get(rack);
-    datanodes.remove(cur);
-    if (datanodes.isEmpty()) {
+    final String rack = getRack(cur.getDatanodeDescriptor());
+    final List<DatanodeStorageInfo> storages = rackMap.get(rack);
+    storages.remove(cur);
+    if (storages.isEmpty()) {
       rackMap.remove(rack);
     }
     if (moreThanOne.remove(cur)) {
-      if (datanodes.size() == 1) {
-        moreThanOne.remove(datanodes.get(0));
-        exactlyOne.add(datanodes.get(0));
+      if (storages.size() == 1) {
+        final DatanodeStorageInfo remaining = storages.get(0);
+        moreThanOne.remove(remaining);
+        exactlyOne.add(remaining);
       }
     } else {
       exactlyOne.remove(cur);
@@ -214,28 +217,28 @@ public abstract class BlockPlacementPoli
    * @param exactlyOne remains contains the remaining nodes
    */
   public void splitNodesWithRack(
-      Collection<DatanodeDescriptor> dataNodes,
-      final Map<String, List<DatanodeDescriptor>> rackMap,
-      final List<DatanodeDescriptor> moreThanOne,
-      final List<DatanodeDescriptor> exactlyOne) {
-    for(DatanodeDescriptor node : dataNodes) {
-      final String rackName = getRack(node);
-      List<DatanodeDescriptor> datanodeList = rackMap.get(rackName);
-      if (datanodeList == null) {
-        datanodeList = new ArrayList<DatanodeDescriptor>();
-        rackMap.put(rackName, datanodeList);
+      final Iterable<DatanodeStorageInfo> storages,
+      final Map<String, List<DatanodeStorageInfo>> rackMap,
+      final List<DatanodeStorageInfo> moreThanOne,
+      final List<DatanodeStorageInfo> exactlyOne) {
+    for(DatanodeStorageInfo s: storages) {
+      final String rackName = getRack(s.getDatanodeDescriptor());
+      List<DatanodeStorageInfo> storageList = rackMap.get(rackName);
+      if (storageList == null) {
+        storageList = new ArrayList<DatanodeStorageInfo>();
+        rackMap.put(rackName, storageList);
       }
-      datanodeList.add(node);
+      storageList.add(s);
     }
     
     // split nodes into two sets
-    for(List<DatanodeDescriptor> datanodeList : rackMap.values()) {
-      if (datanodeList.size() == 1) {
+    for(List<DatanodeStorageInfo> storageList : rackMap.values()) {
+      if (storageList.size() == 1) {
         // exactlyOne contains nodes on rack with only one replica
-        exactlyOne.add(datanodeList.get(0));
+        exactlyOne.add(storageList.get(0));
       } else {
         // moreThanOne contains nodes on rack with more than one replica
-        moreThanOne.addAll(datanodeList);
+        moreThanOne.addAll(storageList);
       }
     }
   }

Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java Fri Jul 25 20:33:09 2014
@@ -145,14 +145,14 @@ public class BlockPlacementPolicyDefault
       List<DatanodeStorageInfo> results = new ArrayList<DatanodeStorageInfo>();
       boolean avoidStaleNodes = stats != null
           && stats.isAvoidingStaleDataNodesForWrite();
-      for (int i = 0; i < Math.min(favoredNodes.size(), numOfReplicas); i++) {
+      for (int i = 0; i < favoredNodes.size() && results.size() < numOfReplicas; i++) {
         DatanodeDescriptor favoredNode = favoredNodes.get(i);
         // Choose a single node which is local to favoredNode.
         // 'results' is updated within chooseLocalNode
         final DatanodeStorageInfo target = chooseLocalStorage(favoredNode,
             favoriteAndExcludedNodes, blocksize, 
             getMaxNodesPerRack(results.size(), numOfReplicas)[1],
-            results, avoidStaleNodes, storageType);
+            results, avoidStaleNodes, storageType, false);
         if (target == null) {
           LOG.warn("Could not find a target for file " + src
               + " with favored node " + favoredNode); 
@@ -271,7 +271,7 @@ public class BlockPlacementPolicyDefault
     try {
       if (numOfResults == 0) {
         writer = chooseLocalStorage(writer, excludedNodes, blocksize,
-            maxNodesPerRack, results, avoidStaleNodes, storageType)
+            maxNodesPerRack, results, avoidStaleNodes, storageType, true)
                 .getDatanodeDescriptor();
         if (--numOfReplicas == 0) {
           return writer;
@@ -345,12 +345,14 @@ public class BlockPlacementPolicyDefault
                                              int maxNodesPerRack,
                                              List<DatanodeStorageInfo> results,
                                              boolean avoidStaleNodes,
-                                             StorageType storageType)
+                                             StorageType storageType,
+                                             boolean fallbackToLocalRack)
       throws NotEnoughReplicasException {
     // if no local machine, randomly choose one node
-    if (localMachine == null)
+    if (localMachine == null) {
       return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
           maxNodesPerRack, results, avoidStaleNodes, storageType);
+    }
     if (preferLocalNode && localMachine instanceof DatanodeDescriptor) {
       DatanodeDescriptor localDatanode = (DatanodeDescriptor) localMachine;
       // otherwise try local machine first
@@ -363,7 +365,11 @@ public class BlockPlacementPolicyDefault
           }
         }
       } 
-    }      
+    }
+
+    if (!fallbackToLocalRack) {
+      return null;
+    }
     // try a node on local rack
     return chooseLocalRack(localMachine, excludedNodes, blocksize,
         maxNodesPerRack, results, avoidStaleNodes, storageType);
@@ -636,15 +642,11 @@ public class BlockPlacementPolicyDefault
 
     // check the communication traffic of the target machine
     if (considerLoad) {
-      double avgLoad = 0;
-      if (stats != null) {
-        int size = stats.getNumDatanodesInService();
-        if (size != 0) {
-          avgLoad = (double)stats.getTotalLoad()/size;
-        }
-      }
-      if (node.getXceiverCount() > (2.0 * avgLoad)) {
-        logNodeIsNotChosen(storage, "the node is too busy ");
+      final double maxLoad = 2.0 * stats.getInServiceXceiverAverage();
+      final int nodeLoad = node.getXceiverCount();
+      if (nodeLoad > maxLoad) {
+        logNodeIsNotChosen(storage,
+            "the node is too busy (load:"+nodeLoad+" > "+maxLoad+") ");
         return false;
       }
     }
@@ -727,31 +729,34 @@ public class BlockPlacementPolicyDefault
   }
 
   @Override
-  public DatanodeDescriptor chooseReplicaToDelete(BlockCollection bc,
+  public DatanodeStorageInfo chooseReplicaToDelete(BlockCollection bc,
       Block block, short replicationFactor,
-      Collection<DatanodeDescriptor> first,
-      Collection<DatanodeDescriptor> second) {
+      Collection<DatanodeStorageInfo> first,
+      Collection<DatanodeStorageInfo> second) {
     long oldestHeartbeat =
       now() - heartbeatInterval * tolerateHeartbeatMultiplier;
-    DatanodeDescriptor oldestHeartbeatNode = null;
+    DatanodeStorageInfo oldestHeartbeatStorage = null;
     long minSpace = Long.MAX_VALUE;
-    DatanodeDescriptor minSpaceNode = null;
+    DatanodeStorageInfo minSpaceStorage = null;
 
     // Pick the node with the oldest heartbeat or with the least free space,
     // if all hearbeats are within the tolerable heartbeat interval
-    for(DatanodeDescriptor node : pickupReplicaSet(first, second)) {
+    for(DatanodeStorageInfo storage : pickupReplicaSet(first, second)) {
+      final DatanodeDescriptor node = storage.getDatanodeDescriptor();
       long free = node.getRemaining();
       long lastHeartbeat = node.getLastUpdate();
       if(lastHeartbeat < oldestHeartbeat) {
         oldestHeartbeat = lastHeartbeat;
-        oldestHeartbeatNode = node;
+        oldestHeartbeatStorage = storage;
       }
       if (minSpace > free) {
         minSpace = free;
-        minSpaceNode = node;
+        minSpaceStorage = storage;
       }
     }
-    return oldestHeartbeatNode != null ? oldestHeartbeatNode : minSpaceNode;
+
+    return oldestHeartbeatStorage != null? oldestHeartbeatStorage
+        : minSpaceStorage;
   }
 
   /**
@@ -760,9 +765,9 @@ public class BlockPlacementPolicyDefault
    * replica while second set contains remaining replica nodes.
    * So pick up first set if not empty. If first is empty, then pick second.
    */
-  protected Collection<DatanodeDescriptor> pickupReplicaSet(
-      Collection<DatanodeDescriptor> first,
-      Collection<DatanodeDescriptor> second) {
+  protected Collection<DatanodeStorageInfo> pickupReplicaSet(
+      Collection<DatanodeStorageInfo> first,
+      Collection<DatanodeStorageInfo> second) {
     return first.isEmpty() ? second : first;
   }
   

Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java Fri Jul 25 20:33:09 2014
@@ -70,7 +70,8 @@ public class BlockPlacementPolicyWithNod
   protected DatanodeStorageInfo chooseLocalStorage(Node localMachine,
       Set<Node> excludedNodes, long blocksize, int maxNodesPerRack,
       List<DatanodeStorageInfo> results, boolean avoidStaleNodes,
-      StorageType storageType) throws NotEnoughReplicasException {
+      StorageType storageType, boolean fallbackToLocalRack
+      ) throws NotEnoughReplicasException {
     // if no local machine, randomly choose one node
     if (localMachine == null)
       return chooseRandom(NodeBase.ROOT, excludedNodes, 
@@ -97,6 +98,10 @@ public class BlockPlacementPolicyWithNod
     if (chosenStorage != null) {
       return chosenStorage;
     }
+
+    if (!fallbackToLocalRack) {
+      return null;
+    }
     // try a node on local rack
     return chooseLocalRack(localMachine, excludedNodes, 
         blocksize, maxNodesPerRack, results, avoidStaleNodes, storageType);
@@ -286,9 +291,9 @@ public class BlockPlacementPolicyWithNod
    * If first is empty, then pick second.
    */
   @Override
-  public Collection<DatanodeDescriptor> pickupReplicaSet(
-      Collection<DatanodeDescriptor> first,
-      Collection<DatanodeDescriptor> second) {
+  public Collection<DatanodeStorageInfo> pickupReplicaSet(
+      Collection<DatanodeStorageInfo> first,
+      Collection<DatanodeStorageInfo> second) {
     // If no replica within same rack, return directly.
     if (first.isEmpty()) {
       return second;
@@ -296,25 +301,24 @@ public class BlockPlacementPolicyWithNod
     // Split data nodes in the first set into two sets, 
     // moreThanOne contains nodes on nodegroup with more than one replica
     // exactlyOne contains the remaining nodes
-    Map<String, List<DatanodeDescriptor>> nodeGroupMap = 
-        new HashMap<String, List<DatanodeDescriptor>>();
+    Map<String, List<DatanodeStorageInfo>> nodeGroupMap = 
+        new HashMap<String, List<DatanodeStorageInfo>>();
     
-    for(DatanodeDescriptor node : first) {
-      final String nodeGroupName = 
-          NetworkTopology.getLastHalf(node.getNetworkLocation());
-      List<DatanodeDescriptor> datanodeList = 
-          nodeGroupMap.get(nodeGroupName);
-      if (datanodeList == null) {
-        datanodeList = new ArrayList<DatanodeDescriptor>();
-        nodeGroupMap.put(nodeGroupName, datanodeList);
+    for(DatanodeStorageInfo storage : first) {
+      final String nodeGroupName = NetworkTopology.getLastHalf(
+          storage.getDatanodeDescriptor().getNetworkLocation());
+      List<DatanodeStorageInfo> storageList = nodeGroupMap.get(nodeGroupName);
+      if (storageList == null) {
+        storageList = new ArrayList<DatanodeStorageInfo>();
+        nodeGroupMap.put(nodeGroupName, storageList);
       }
-      datanodeList.add(node);
+      storageList.add(storage);
     }
     
-    final List<DatanodeDescriptor> moreThanOne = new ArrayList<DatanodeDescriptor>();
-    final List<DatanodeDescriptor> exactlyOne = new ArrayList<DatanodeDescriptor>();
+    final List<DatanodeStorageInfo> moreThanOne = new ArrayList<DatanodeStorageInfo>();
+    final List<DatanodeStorageInfo> exactlyOne = new ArrayList<DatanodeStorageInfo>();
     // split nodes into two sets
-    for(List<DatanodeDescriptor> datanodeList : nodeGroupMap.values()) {
+    for(List<DatanodeStorageInfo> datanodeList : nodeGroupMap.values()) {
       if (datanodeList.size() == 1 ) {
         // exactlyOne contains nodes on nodegroup with exactly one replica
         exactlyOne.add(datanodeList.get(0));

Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java Fri Jul 25 20:33:09 2014
@@ -345,7 +345,8 @@ public class DatanodeManager {
   
   /** Sort the located blocks by the distance to the target host. */
   public void sortLocatedBlocks(final String targethost,
-      final List<LocatedBlock> locatedblocks) {
+      final List<LocatedBlock> locatedblocks,
+      boolean randomizeBlockLocationsPerBlock) {
     //sort the blocks
     // As it is possible for the separation of node manager and datanode, 
     // here we should get node but not datanode only .
@@ -372,8 +373,8 @@ public class DatanodeManager {
           --lastActiveIndex;
       }
       int activeLen = lastActiveIndex + 1;      
-      networktopology.sortByDistance(client, b.getLocations(), activeLen,
-          b.getBlock().getBlockId());
+      networktopology.sortByDistance(client, b.getLocations(), activeLen, b
+          .getBlock().getBlockId(), randomizeBlockLocationsPerBlock);
     }
   }
   
@@ -820,7 +821,9 @@ public class DatanodeManager {
   }
 
   /** Start decommissioning the specified datanode. */
-  private void startDecommission(DatanodeDescriptor node) {
+  @InterfaceAudience.Private
+  @VisibleForTesting
+  public void startDecommission(DatanodeDescriptor node) {
     if (!node.isDecommissionInProgress() && !node.isDecommissioned()) {
       for (DatanodeStorageInfo storage : node.getStorageInfos()) {
         LOG.info("Start Decommissioning " + node + " " + storage

Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStatistics.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStatistics.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStatistics.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStatistics.java Fri Jul 25 20:33:09 2014
@@ -52,6 +52,12 @@ public interface DatanodeStatistics {
   /** @return the xceiver count */
   public int getXceiverCount();
 
+  /** @return average xceiver count for non-decommission(ing|ed) nodes */
+  public int getInServiceXceiverCount();
+  
+  /** @return number of non-decommission(ing|ed) nodes */
+  public int getNumDatanodesInService();
+  
   /**
    * @return the total used space by data nodes for non-DFS purposes
    * such as storing temporary files on the local file system

Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java Fri Jul 25 20:33:09 2014
@@ -22,6 +22,7 @@ import java.util.Iterator;
 import java.util.List;
 
 import com.google.common.annotations.VisibleForTesting;
+
 import org.apache.hadoop.hdfs.StorageType;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
@@ -290,4 +291,21 @@ public class DatanodeStorageInfo {
   public String toString() {
     return "[" + storageType + "]" + storageID + ":" + state;
   }
+
+  /** @return the first {@link DatanodeStorageInfo} corresponding to
+   *          the given datanode
+   */
+  static DatanodeStorageInfo getDatanodeStorageInfo(
+      final Iterable<DatanodeStorageInfo> infos,
+      final DatanodeDescriptor datanode) {
+    if (datanode == null) {
+      return null;
+    }
+    for(DatanodeStorageInfo storage : infos) {
+      if (storage.getDatanodeDescriptor() == datanode) {
+        return storage;
+      }
+    }
+    return null;
+  }
 }

Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java Fri Jul 25 20:33:09 2014
@@ -151,6 +151,16 @@ class HeartbeatManager implements Datano
   }
   
   @Override
+  public synchronized int getInServiceXceiverCount() {
+    return stats.nodesInServiceXceiverCount;
+  }
+  
+  @Override
+  public synchronized int getNumDatanodesInService() {
+    return stats.nodesInService;
+  }
+  
+  @Override
   public synchronized long getCacheCapacity() {
     return stats.cacheCapacity;
   }
@@ -178,7 +188,7 @@ class HeartbeatManager implements Datano
   }
 
   synchronized void register(final DatanodeDescriptor d) {
-    if (!datanodes.contains(d)) {
+    if (!d.isAlive) {
       addDatanode(d);
 
       //update its timestamp
@@ -191,6 +201,8 @@ class HeartbeatManager implements Datano
   }
 
   synchronized void addDatanode(final DatanodeDescriptor d) {
+    // update in-service node count
+    stats.add(d);
     datanodes.add(d);
     d.isAlive = true;
   }
@@ -323,6 +335,9 @@ class HeartbeatManager implements Datano
     private long cacheCapacity = 0L;
     private long cacheUsed = 0L;
 
+    private int nodesInService = 0;
+    private int nodesInServiceXceiverCount = 0;
+
     private int expiredHeartbeats = 0;
 
     private void add(final DatanodeDescriptor node) {
@@ -330,6 +345,8 @@ class HeartbeatManager implements Datano
       blockPoolUsed += node.getBlockPoolUsed();
       xceiverCount += node.getXceiverCount();
       if (!(node.isDecommissionInProgress() || node.isDecommissioned())) {
+        nodesInService++;
+        nodesInServiceXceiverCount += node.getXceiverCount();
         capacityTotal += node.getCapacity();
         capacityRemaining += node.getRemaining();
       } else {
@@ -344,6 +361,8 @@ class HeartbeatManager implements Datano
       blockPoolUsed -= node.getBlockPoolUsed();
       xceiverCount -= node.getXceiverCount();
       if (!(node.isDecommissionInProgress() || node.isDecommissioned())) {
+        nodesInService--;
+        nodesInServiceXceiverCount -= node.getXceiverCount();
         capacityTotal -= node.getCapacity();
         capacityRemaining -= node.getRemaining();
       } else {

Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java Fri Jul 25 20:33:09 2014
@@ -93,7 +93,8 @@ public final class HdfsServerConstants {
     FORCE("-force"),
     NONINTERACTIVE("-nonInteractive"),
     RENAMERESERVED("-renameReserved"),
-    METADATAVERSION("-metadataVersion");
+    METADATAVERSION("-metadataVersion"),
+    UPGRADEONLY("-upgradeOnly");
 
     private static final Pattern ENUM_WITH_ROLLING_UPGRADE_OPTION = Pattern.compile(
         "(\\w+)\\((\\w+)\\)");

Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java Fri Jul 25 20:33:09 2014
@@ -575,7 +575,8 @@ class BPOfferService {
     switch(cmd.getAction()) {
     case DatanodeProtocol.DNA_TRANSFER:
       // Send a copy of a block to another datanode
-      dn.transferBlocks(bcmd.getBlockPoolId(), bcmd.getBlocks(), bcmd.getTargets());
+      dn.transferBlocks(bcmd.getBlockPoolId(), bcmd.getBlocks(),
+          bcmd.getTargets(), bcmd.getTargetStorageTypes());
       dn.metrics.incrBlocksReplicated(bcmd.getBlocks().length);
       break;
     case DatanodeProtocol.DNA_INVALIDATE:

Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java Fri Jul 25 20:33:09 2014
@@ -84,6 +84,10 @@ class BlockPoolSliceScanner {
   
   private final SortedSet<BlockScanInfo> blockInfoSet
       = new TreeSet<BlockScanInfo>(BlockScanInfo.LAST_SCAN_TIME_COMPARATOR);
+
+  private final SortedSet<BlockScanInfo> newBlockInfoSet =
+      new TreeSet<BlockScanInfo>(BlockScanInfo.LAST_SCAN_TIME_COMPARATOR);
+
   private final GSet<Block, BlockScanInfo> blockMap
       = new LightWeightGSet<Block, BlockScanInfo>(
           LightWeightGSet.computeCapacity(0.5, "BlockMap"));
@@ -195,7 +199,7 @@ class BlockPoolSliceScanner {
       BlockScanInfo info = new BlockScanInfo( block );
       info.lastScanTime = scanTime--; 
       //still keep 'info.lastScanType' to NONE.
-      addBlockInfo(info);
+      addBlockInfo(info, false);
     }
 
     RollingLogs rollingLogs = null;
@@ -221,25 +225,42 @@ class BlockPoolSliceScanner {
     // Should we change throttler bandwidth every time bytesLeft changes?
     // not really required.
   }
-  
-  private synchronized void addBlockInfo(BlockScanInfo info) {
-    boolean added = blockInfoSet.add(info);
+
+  /**
+   * Add the BlockScanInfo to sorted set of blockScanInfo
+   * @param info BlockScanInfo to be added
+   * @param isNewBlock true if the block is the new Block, false if
+   *          BlockScanInfo is being updated with new scanTime
+   */
+  private synchronized void addBlockInfo(BlockScanInfo info,
+      boolean isNewBlock) {
+    boolean added = false;
+    if (isNewBlock) {
+      // check whether the block already present
+      boolean exists = blockInfoSet.contains(info);
+      added = !exists && newBlockInfoSet.add(info);
+    } else {
+      added = blockInfoSet.add(info);
+    }
     blockMap.put(info);
     
     if (added) {
       updateBytesToScan(info.getNumBytes(), info.lastScanTime);
     }
   }
-  
+
   private synchronized void delBlockInfo(BlockScanInfo info) {
     boolean exists = blockInfoSet.remove(info);
+    if (!exists){
+      exists = newBlockInfoSet.remove(info);
+    }
     blockMap.remove(info);
 
     if (exists) {
       updateBytesToScan(-info.getNumBytes(), info.lastScanTime);
     }
   }
-  
+
   /** Update blockMap by the given LogEntry */
   private synchronized void updateBlockInfo(LogEntry e) {
     BlockScanInfo info = blockMap.get(new Block(e.blockId, 0, e.genStamp));
@@ -249,7 +270,7 @@ class BlockPoolSliceScanner {
       delBlockInfo(info);
       info.lastScanTime = e.verificationTime;
       info.lastScanType = ScanType.VERIFICATION_SCAN;
-      addBlockInfo(info);
+      addBlockInfo(info, false);
     }
   }
 
@@ -275,14 +296,14 @@ class BlockPoolSliceScanner {
     info = new BlockScanInfo(block.getLocalBlock());    
     info.lastScanTime = getNewBlockScanTime();
     
-    addBlockInfo(info);
+    addBlockInfo(info, true);
     adjustThrottler();
   }
   
   /** Deletes the block from internal structures */
   synchronized void deleteBlock(Block block) {
     BlockScanInfo info = blockMap.get(block);
-    if ( info != null ) {
+    if (info != null) {
       delBlockInfo(info);
     }
   }
@@ -310,23 +331,16 @@ class BlockPoolSliceScanner {
     }
   }
   
-  private synchronized void updateScanStatus(Block block, 
+  private synchronized void updateScanStatus(BlockScanInfo info,
                                              ScanType type,
                                              boolean scanOk) {
-    BlockScanInfo info = blockMap.get(block);
-    
-    if ( info != null ) {
-      delBlockInfo(info);
-    } else {
-      // It might already be removed. Thats ok, it will be caught next time.
-      info = new BlockScanInfo(block);
-    }
-    
+    delBlockInfo(info);
+
     long now = Time.monotonicNow();
     info.lastScanType = type;
     info.lastScanTime = now;
     info.lastScanOk = scanOk;
-    addBlockInfo(info);
+    addBlockInfo(info, false);
         
     // Don't update meta data if the verification failed.
     if (!scanOk) {
@@ -334,8 +348,8 @@ class BlockPoolSliceScanner {
     }
     
     if (verificationLog != null) {
-      verificationLog.append(now, block.getGenerationStamp(),
-          block.getBlockId());
+      verificationLog.append(now, info.getGenerationStamp(),
+          info.getBlockId());
     }
   }
   
@@ -434,11 +448,13 @@ class BlockPoolSliceScanner {
           totalTransientErrors++;
         }
         
-        updateScanStatus(block.getLocalBlock(), ScanType.VERIFICATION_SCAN, true);
+        updateScanStatus((BlockScanInfo)block.getLocalBlock(),
+            ScanType.VERIFICATION_SCAN, true);
 
         return;
       } catch (IOException e) {
-        updateScanStatus(block.getLocalBlock(), ScanType.VERIFICATION_SCAN, false);
+        updateScanStatus((BlockScanInfo)block.getLocalBlock(),
+            ScanType.VERIFICATION_SCAN, false);
 
         // If the block does not exists anymore, then its not an error
         if (!dataset.contains(block)) {
@@ -497,7 +513,7 @@ class BlockPoolSliceScanner {
   
   // Picks one block and verifies it
   private void verifyFirstBlock() {
-    Block block = null;
+    BlockScanInfo block = null;
     synchronized (this) {
       if (!blockInfoSet.isEmpty()) {
         block = blockInfoSet.first();
@@ -583,7 +599,7 @@ class BlockPoolSliceScanner {
           delBlockInfo(info);        
           info.lastScanTime = lastScanTime;
           lastScanTime += verifyInterval;
-          addBlockInfo(info);
+          addBlockInfo(info, false);
         }
       }
     }
@@ -679,12 +695,21 @@ class BlockPoolSliceScanner {
       throw e;
     } finally {
       rollVerificationLogs();
+      rollNewBlocksInfo();
       if (LOG.isDebugEnabled()) {
         LOG.debug("Done scanning block pool: " + blockPoolId);
       }
     }
   }
-  
+
+  // add new blocks to scan in next iteration
+  private synchronized void rollNewBlocksInfo() {
+    for (BlockScanInfo newBlock : newBlockInfoSet) {
+      blockInfoSet.add(newBlock);
+    }
+    newBlockInfoSet.clear();
+  }
+
   private synchronized void rollVerificationLogs() {
     if (verificationLog != null) {
       try {

Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Fri Jul 25 20:33:09 2014
@@ -37,6 +37,7 @@ import java.util.zip.Checksum;
 import org.apache.commons.logging.Log;
 import org.apache.hadoop.fs.ChecksumException;
 import org.apache.hadoop.fs.FSOutputSummer;
+import org.apache.hadoop.hdfs.StorageType;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
@@ -122,7 +123,8 @@ class BlockReceiver implements Closeable
   private boolean syncOnClose;
   private long restartBudget;
 
-  BlockReceiver(final ExtendedBlock block, final DataInputStream in,
+  BlockReceiver(final ExtendedBlock block, final StorageType storageType,
+      final DataInputStream in,
       final String inAddr, final String myAddr,
       final BlockConstructionStage stage, 
       final long newGs, final long minBytesRcvd, final long maxBytesRcvd, 
@@ -162,11 +164,11 @@ class BlockReceiver implements Closeable
       // Open local disk out
       //
       if (isDatanode) { //replication or move
-        replicaInfo = datanode.data.createTemporary(block);
+        replicaInfo = datanode.data.createTemporary(storageType, block);
       } else {
         switch (stage) {
         case PIPELINE_SETUP_CREATE:
-          replicaInfo = datanode.data.createRbw(block);
+          replicaInfo = datanode.data.createRbw(storageType, block);
           datanode.notifyNamenodeReceivingBlock(
               block, replicaInfo.getStorageUuid());
           break;
@@ -198,7 +200,7 @@ class BlockReceiver implements Closeable
         case TRANSFER_RBW:
         case TRANSFER_FINALIZED:
           // this is a transfer destination
-          replicaInfo = datanode.data.createTemporary(block);
+          replicaInfo = datanode.data.createTemporary(storageType, block);
           break;
         default: throw new IOException("Unsupported stage " + stage + 
               " while receiving block " + block + " from " + inAddr);

Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java Fri Jul 25 20:33:09 2014
@@ -52,7 +52,9 @@ import static org.apache.hadoop.hdfs.DFS
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
+import org.apache.hadoop.security.SaslPropertiesResolver;
 
 /**
  * Simple class encapsulating all of the configuration that the DataNode
@@ -86,6 +88,7 @@ public class DNConf {
   
   final String minimumNameNodeVersion;
   final String encryptionAlgorithm;
+  final SaslPropertiesResolver saslPropsResolver;
   final TrustedChannelResolver trustedChannelResolver;
   
   final long xceiverStopTimeout;
@@ -168,6 +171,8 @@ public class DNConf {
         DFS_ENCRYPT_DATA_TRANSFER_DEFAULT);
     this.encryptionAlgorithm = conf.get(DFS_DATA_ENCRYPTION_ALGORITHM_KEY);
     this.trustedChannelResolver = TrustedChannelResolver.getInstance(conf);
+    this.saslPropsResolver = DataTransferSaslUtil.getSaslPropertiesResolver(
+      conf);
     
     this.xceiverStopTimeout = conf.getLong(
         DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_KEY,
@@ -186,7 +191,26 @@ public class DNConf {
   String getMinimumNameNodeVersion() {
     return this.minimumNameNodeVersion;
   }
-  
+
+  /**
+   * Returns true if encryption enabled for DataTransferProtocol.
+   *
+   * @return boolean true if encryption enabled for DataTransferProtocol
+   */
+  public boolean getEncryptDataTransfer() {
+    return encryptDataTransfer;
+  }
+
+  /**
+   * Returns encryption algorithm configured for DataTransferProtocol, or null
+   * if not configured.
+   *
+   * @return encryption algorithm configured for DataTransferProtocol
+   */
+  public String getEncryptionAlgorithm() {
+    return encryptionAlgorithm;
+  }
+
   public long getXceiverStopTimeout() {
     return xceiverStopTimeout;
   }
@@ -194,4 +218,24 @@ public class DNConf {
   public long getMaxLockedMemory() {
     return maxLockedMemory;
   }
+
+  /**
+   * Returns the SaslPropertiesResolver configured for use with
+   * DataTransferProtocol, or null if not configured.
+   *
+   * @return SaslPropertiesResolver configured for use with DataTransferProtocol
+   */
+  public SaslPropertiesResolver getSaslPropsResolver() {
+    return saslPropsResolver;
+  }
+
+  /**
+   * Returns the TrustedChannelResolver configured for use with
+   * DataTransferProtocol, or null if not configured.
+   *
+   * @return TrustedChannelResolver configured for use with DataTransferProtocol
+   */
+  public TrustedChannelResolver getTrustedChannelResolver() {
+    return trustedChannelResolver;
+  }
 }