You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by su...@apache.org on 2014/07/25 22:33:22 UTC

svn commit: r1613514 [3/6] - in /hadoop/common/branches/YARN-1051/hadoop-hdfs-project: hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/ hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/ hadoop-hdfs-nfs/src/test/java/org/apac...

Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Fri Jul 25 20:33:09 2014
@@ -17,10 +17,68 @@
  */
 package org.apache.hadoop.hdfs.server.datanode;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.protobuf.BlockingService;
+import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT;
+import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ADMIN;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_PERMISSION_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_PERMISSION_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DNS_INTERFACE_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DNS_INTERFACE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DNS_NAMESERVER_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DNS_NAMESERVER_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HANDLER_COUNT_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HANDLER_COUNT_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PLUGINS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_STARTUP_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_KEY;
+import static org.apache.hadoop.util.ExitUtil.terminate;
+
+import java.io.BufferedOutputStream;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.lang.management.ManagementFactory;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.URI;
+import java.net.UnknownHostException;
+import java.nio.channels.SocketChannel;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.management.ObjectName;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -36,17 +94,44 @@ import org.apache.hadoop.hdfs.DFSConfigK
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.HDFSPolicyProvider;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.StorageType;
 import org.apache.hadoop.hdfs.net.DomainPeerServer;
 import org.apache.hadoop.hdfs.net.TcpPeerServer;
-import org.apache.hadoop.hdfs.protocol.*;
-import org.apache.hadoop.hdfs.protocol.datatransfer.*;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
+import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.DatanodeLocalInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
+import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferServer;
 import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ClientDatanodeProtocolService;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DNTransferAckProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
 import org.apache.hadoop.hdfs.protocol.proto.InterDatanodeProtocolProtos.InterDatanodeProtocolService;
-import org.apache.hadoop.hdfs.protocolPB.*;
-import org.apache.hadoop.hdfs.security.token.block.*;
+import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolPB;
+import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolServerSideTranslatorPB;
+import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
+import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolPB;
+import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolServerSideTranslatorPB;
+import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolTranslatorPB;
+import org.apache.hadoop.hdfs.protocolPB.PBHelper;
+import org.apache.hadoop.hdfs.security.token.block.BlockPoolTokenSecretManager;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager.AccessMode;
+import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
+import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
+import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
@@ -59,7 +144,11 @@ import org.apache.hadoop.hdfs.server.dat
 import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
 import org.apache.hadoop.hdfs.server.datanode.web.resources.DatanodeWebHdfsMethods;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
-import org.apache.hadoop.hdfs.server.protocol.*;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
 import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
 import org.apache.hadoop.hdfs.web.resources.Param;
 import org.apache.hadoop.http.HttpConfig;
@@ -82,22 +171,21 @@ import org.apache.hadoop.security.UserGr
 import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.util.*;
+import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.util.DiskChecker;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.hadoop.util.JvmPauseMonitor;
+import org.apache.hadoop.util.ServicePlugin;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.util.VersionInfo;
 import org.mortbay.util.ajax.JSON;
 
-import javax.management.ObjectName;
-
-import java.io.*;
-import java.lang.management.ManagementFactory;
-import java.net.*;
-import java.nio.channels.SocketChannel;
-import java.security.PrivilegedExceptionAction;
-import java.util.*;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
-import static org.apache.hadoop.util.ExitUtil.terminate;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.protobuf.BlockingService;
 
 /**********************************************************
  * DataNode is a class (and program) that stores a set of
@@ -224,6 +312,8 @@ public class DataNode extends Configured
   private final List<String> usersWithLocalPathAccess;
   private final boolean connectToDnViaHostname;
   ReadaheadPool readaheadPool;
+  SaslDataTransferClient saslClient;
+  SaslDataTransferServer saslServer;
   private final boolean getHdfsBlockLocationsEnabled;
   private ObjectName dataNodeInfoBeanName;
   private Thread checkDiskErrorThread = null;
@@ -722,15 +812,10 @@ public class DataNode extends Configured
    */
   void startDataNode(Configuration conf, 
                      List<StorageLocation> dataDirs,
-                    // DatanodeProtocol namenode,
                      SecureResources resources
                      ) throws IOException {
-    if(UserGroupInformation.isSecurityEnabled() && resources == null) {
-      if (!conf.getBoolean("ignore.secure.ports.for.testing", false)) {
-        throw new RuntimeException("Cannot start secure cluster without "
-            + "privileged resources.");
-      }
-    }
+
+    checkSecureConfig(conf, resources);
 
     // settings global for all BPs in the Data Node
     this.secureResources = resources;
@@ -790,6 +875,55 @@ public class DataNode extends Configured
     // Create the ReadaheadPool from the DataNode context so we can
     // exit without having to explicitly shutdown its thread pool.
     readaheadPool = ReadaheadPool.getInstance();
+    saslClient = new SaslDataTransferClient(dnConf.saslPropsResolver,
+      dnConf.trustedChannelResolver,
+      conf.getBoolean(
+        IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
+        IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT));
+    saslServer = new SaslDataTransferServer(dnConf, blockPoolTokenSecretManager);
+  }
+
+  /**
+   * Checks if the DataNode has a secure configuration if security is enabled.
+   * There are 2 possible configurations that are considered secure:
+   * 1. The server has bound to privileged ports for RPC and HTTP via
+   *   SecureDataNodeStarter.
+   * 2. The configuration enables SASL on DataTransferProtocol and HTTPS (no
+   *   plain HTTP) for the HTTP server.  The SASL handshake guarantees
+   *   authentication of the RPC server before a client transmits a secret, such
+   *   as a block access token.  Similarly, SSL guarantees authentication of the
+   *   HTTP server before a client transmits a secret, such as a delegation
+   *   token.
+   * It is not possible to run with both privileged ports and SASL on
+   * DataTransferProtocol.  For backwards-compatibility, the connection logic
+   * must check if the target port is a privileged port, and if so, skip the
+   * SASL handshake.
+   *
+   * @param conf Configuration to check
+   * @param resources SecuredResources obtained for DataNode
+   * @throws RuntimeException if security enabled, but configuration is insecure
+   */
+  private static void checkSecureConfig(Configuration conf,
+      SecureResources resources) throws RuntimeException {
+    if (!UserGroupInformation.isSecurityEnabled()) {
+      return;
+    }
+    String dataTransferProtection = conf.get(DFS_DATA_TRANSFER_PROTECTION_KEY);
+    if (resources != null && dataTransferProtection == null) {
+      return;
+    }
+    if (conf.getBoolean("ignore.secure.ports.for.testing", false)) {
+      return;
+    }
+    if (dataTransferProtection != null &&
+        DFSUtil.getHttpPolicy(conf) == HttpConfig.Policy.HTTPS_ONLY &&
+        resources == null) {
+      return;
+    }
+    throw new RuntimeException("Cannot start secure DataNode without " +
+      "configuring either privileged resources or SASL RPC data transfer " +
+      "protection and SSL for HTTP.  Using privileged resources in " +
+      "combination with SASL RPC data transfer protection is not supported.");
   }
   
   public static String generateUuid() {
@@ -1423,8 +1557,8 @@ public class DataNode extends Configured
     return xmitsInProgress.get();
   }
     
-  private void transferBlock(ExtendedBlock block, DatanodeInfo xferTargets[])
-      throws IOException {
+  private void transferBlock(ExtendedBlock block, DatanodeInfo[] xferTargets,
+      StorageType[] xferTargetStorageTypes) throws IOException {
     BPOfferService bpos = getBPOSForBlock(block);
     DatanodeRegistration bpReg = getDNRegistrationForBP(block.getBlockPoolId());
     
@@ -1460,16 +1594,17 @@ public class DataNode extends Configured
       LOG.info(bpReg + " Starting thread to transfer " + 
                block + " to " + xfersBuilder);                       
 
-      new Daemon(new DataTransfer(xferTargets, block,
+      new Daemon(new DataTransfer(xferTargets, xferTargetStorageTypes, block,
           BlockConstructionStage.PIPELINE_SETUP_CREATE, "")).start();
     }
   }
 
   void transferBlocks(String poolId, Block blocks[],
-      DatanodeInfo xferTargets[][]) {
+      DatanodeInfo xferTargets[][], StorageType[][] xferTargetStorageTypes) {
     for (int i = 0; i < blocks.length; i++) {
       try {
-        transferBlock(new ExtendedBlock(poolId, blocks[i]), xferTargets[i]);
+        transferBlock(new ExtendedBlock(poolId, blocks[i]), xferTargets[i],
+            xferTargetStorageTypes[i]);
       } catch (IOException ie) {
         LOG.warn("Failed to transfer block " + blocks[i], ie);
       }
@@ -1572,6 +1707,7 @@ public class DataNode extends Configured
    */
   private class DataTransfer implements Runnable {
     final DatanodeInfo[] targets;
+    final StorageType[] targetStorageTypes;
     final ExtendedBlock b;
     final BlockConstructionStage stage;
     final private DatanodeRegistration bpReg;
@@ -1582,7 +1718,8 @@ public class DataNode extends Configured
      * Connect to the first item in the target list.  Pass along the 
      * entire target list, the block, and the data.
      */
-    DataTransfer(DatanodeInfo targets[], ExtendedBlock b, BlockConstructionStage stage,
+    DataTransfer(DatanodeInfo targets[], StorageType[] targetStorageTypes,
+        ExtendedBlock b, BlockConstructionStage stage,
         final String clientname) {
       if (DataTransferProtocol.LOG.isDebugEnabled()) {
         DataTransferProtocol.LOG.debug(getClass().getSimpleName() + ": "
@@ -1592,6 +1729,7 @@ public class DataNode extends Configured
             + ", targests=" + Arrays.asList(targets));
       }
       this.targets = targets;
+      this.targetStorageTypes = targetStorageTypes;
       this.b = b;
       this.stage = stage;
       BPOfferService bpos = blockPoolManager.get(b.getBlockPoolId());
@@ -1623,20 +1761,25 @@ public class DataNode extends Configured
         NetUtils.connect(sock, curTarget, dnConf.socketTimeout);
         sock.setSoTimeout(targets.length * dnConf.socketTimeout);
 
+        //
+        // Header info
+        //
+        Token<BlockTokenIdentifier> accessToken = BlockTokenSecretManager.DUMMY_TOKEN;
+        if (isBlockTokenEnabled) {
+          accessToken = blockPoolTokenSecretManager.generateToken(b, 
+              EnumSet.of(BlockTokenSecretManager.AccessMode.WRITE));
+        }
+
         long writeTimeout = dnConf.socketWriteTimeout + 
                             HdfsServerConstants.WRITE_TIMEOUT_EXTENSION * (targets.length-1);
         OutputStream unbufOut = NetUtils.getOutputStream(sock, writeTimeout);
         InputStream unbufIn = NetUtils.getInputStream(sock);
-        if (dnConf.encryptDataTransfer && 
-            !dnConf.trustedChannelResolver.isTrusted(sock.getInetAddress())) {
-          IOStreamPair encryptedStreams =
-              DataTransferEncryptor.getEncryptedStreams(
-                  unbufOut, unbufIn,
-                  blockPoolTokenSecretManager.generateDataEncryptionKey(
-                      b.getBlockPoolId()));
-          unbufOut = encryptedStreams.out;
-          unbufIn = encryptedStreams.in;
-        }
+        DataEncryptionKeyFactory keyFactory =
+          getDataEncryptionKeyFactoryForBlock(b);
+        IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut,
+          unbufIn, keyFactory, accessToken, bpReg);
+        unbufOut = saslStreams.out;
+        unbufIn = saslStreams.in;
         
         out = new DataOutputStream(new BufferedOutputStream(unbufOut,
             HdfsConstants.SMALL_BUFFER_SIZE));
@@ -1645,16 +1788,8 @@ public class DataNode extends Configured
             false, false, true, DataNode.this, null, cachingStrategy);
         DatanodeInfo srcNode = new DatanodeInfo(bpReg);
 
-        //
-        // Header info
-        //
-        Token<BlockTokenIdentifier> accessToken = BlockTokenSecretManager.DUMMY_TOKEN;
-        if (isBlockTokenEnabled) {
-          accessToken = blockPoolTokenSecretManager.generateToken(b, 
-              EnumSet.of(BlockTokenSecretManager.AccessMode.WRITE));
-        }
-
-        new Sender(out).writeBlock(b, accessToken, clientname, targets, srcNode,
+        new Sender(out).writeBlock(b, targetStorageTypes[0], accessToken,
+            clientname, targets, targetStorageTypes, srcNode,
             stage, 0, 0, 0, 0, blockSender.getChecksum(), cachingStrategy);
 
         // send data & checksum
@@ -1696,7 +1831,26 @@ public class DataNode extends Configured
       }
     }
   }
-  
+
+  /**
+   * Returns a new DataEncryptionKeyFactory that generates a key from the
+   * BlockPoolTokenSecretManager, using the block pool ID of the given block.
+   *
+   * @param block for which the factory needs to create a key
+   * @return DataEncryptionKeyFactory for block's block pool ID
+   */
+  DataEncryptionKeyFactory getDataEncryptionKeyFactoryForBlock(
+      final ExtendedBlock block) {
+    return new DataEncryptionKeyFactory() {
+      @Override
+      public DataEncryptionKey newDataEncryptionKey() {
+        return dnConf.encryptDataTransfer ?
+          blockPoolTokenSecretManager.generateDataEncryptionKey(
+            block.getBlockPoolId()) : null;
+      }
+    };
+  }
+
   /**
    * After a block becomes finalized, a datanode increases metric counter,
    * notifies namenode, and adds it to the block scanner
@@ -2336,7 +2490,8 @@ public class DataNode extends Configured
    * @param client client name
    */
   void transferReplicaForPipelineRecovery(final ExtendedBlock b,
-      final DatanodeInfo[] targets, final String client) throws IOException {
+      final DatanodeInfo[] targets, final StorageType[] targetStorageTypes,
+      final String client) throws IOException {
     final long storedGS;
     final long visible;
     final BlockConstructionStage stage;
@@ -2369,7 +2524,7 @@ public class DataNode extends Configured
     b.setNumBytes(visible);
 
     if (targets.length > 0) {
-      new DataTransfer(targets, b, stage, client).run();
+      new DataTransfer(targets, targetStorageTypes, b, stage, client).run();
     }
   }
 

Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Fri Jul 25 20:33:09 2014
@@ -36,29 +36,27 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.InterruptedIOException;
 import java.io.OutputStream;
-import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.net.SocketException;
-import java.net.UnknownHostException;
 import java.nio.channels.ClosedChannelException;
 import java.security.MessageDigest;
 import java.util.Arrays;
 
 import org.apache.commons.logging.Log;
 import org.apache.hadoop.hdfs.ExtendedBlockId;
+import org.apache.hadoop.hdfs.StorageType;
 import org.apache.hadoop.hdfs.net.Peer;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
-import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor;
-import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor.InvalidMagicNumberException;
 import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
 import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
 import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
 import org.apache.hadoop.hdfs.protocol.datatransfer.Receiver;
 import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto;
@@ -85,7 +83,6 @@ import org.apache.hadoop.security.token.
 import org.apache.hadoop.util.DataChecksum;
 
 import com.google.common.base.Preconditions;
-import com.google.common.net.InetAddresses;
 import com.google.protobuf.ByteString;
 
 
@@ -174,24 +171,11 @@ class DataXceiver extends Receiver imple
       dataXceiverServer.addPeer(peer, Thread.currentThread());
       peer.setWriteTimeout(datanode.getDnConf().socketWriteTimeout);
       InputStream input = socketIn;
-      if ((!peer.hasSecureChannel()) && dnConf.encryptDataTransfer &&
-          !dnConf.trustedChannelResolver.isTrusted(getClientAddress(peer))){
-        IOStreamPair encryptedStreams = null;
-        try {
-          encryptedStreams = DataTransferEncryptor.getEncryptedStreams(socketOut,
-              socketIn, datanode.blockPoolTokenSecretManager,
-              dnConf.encryptionAlgorithm);
-        } catch (InvalidMagicNumberException imne) {
-          LOG.info("Failed to read expected encryption handshake from client " +
-              "at " + peer.getRemoteAddressString() + ". Perhaps the client " +
-              "is running an older version of Hadoop which does not support " +
-              "encryption");
-          return;
-        }
-        input = encryptedStreams.in;
-        socketOut = encryptedStreams.out;
-      }
-      input = new BufferedInputStream(input, HdfsConstants.SMALL_BUFFER_SIZE);
+      IOStreamPair saslStreams = datanode.saslServer.receive(peer, socketOut,
+        socketIn, datanode.getDatanodeId());
+      input = new BufferedInputStream(saslStreams.in,
+        HdfsConstants.SMALL_BUFFER_SIZE);
+      socketOut = saslStreams.out;
       
       super.initialize(new DataInputStream(input));
       
@@ -263,19 +247,6 @@ class DataXceiver extends Receiver imple
       }
     }
   }
-  
-  /**
-   * Returns InetAddress from peer
-   * The getRemoteAddressString is the form  /ip-address:port
-   * The ip-address is extracted from peer and InetAddress is formed
-   * @param peer
-   * @return
-   * @throws UnknownHostException
-   */
-  private static InetAddress getClientAddress(Peer peer) {
-    return InetAddresses.forString(
-        peer.getRemoteAddressString().split(":")[0].substring(1));
-  }
 
   @Override
   public void requestShortCircuitFds(final ExtendedBlock blk,
@@ -554,9 +525,11 @@ class DataXceiver extends Receiver imple
 
   @Override
   public void writeBlock(final ExtendedBlock block,
+      final StorageType storageType, 
       final Token<BlockTokenIdentifier> blockToken,
       final String clientname,
       final DatanodeInfo[] targets,
+      final StorageType[] targetStorageTypes, 
       final DatanodeInfo srcDataNode,
       final BlockConstructionStage stage,
       final int pipelineSize,
@@ -620,12 +593,13 @@ class DataXceiver extends Receiver imple
       if (isDatanode || 
           stage != BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
         // open a block receiver
-        blockReceiver = new BlockReceiver(block, in, 
+        blockReceiver = new BlockReceiver(block, storageType, in,
             peer.getRemoteAddressString(),
             peer.getLocalAddressString(),
             stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd,
             clientname, srcDataNode, datanode, requestedChecksum,
             cachingStrategy);
+        
         storageUuid = blockReceiver.getStorageUuid();
       } else {
         storageUuid = datanode.data.recoverClose(
@@ -656,25 +630,20 @@ class DataXceiver extends Receiver imple
           OutputStream unbufMirrorOut = NetUtils.getOutputStream(mirrorSock,
               writeTimeout);
           InputStream unbufMirrorIn = NetUtils.getInputStream(mirrorSock);
-          if (dnConf.encryptDataTransfer &&
-              !dnConf.trustedChannelResolver.isTrusted(mirrorSock.getInetAddress())) {
-            IOStreamPair encryptedStreams =
-                DataTransferEncryptor.getEncryptedStreams(
-                    unbufMirrorOut, unbufMirrorIn,
-                    datanode.blockPoolTokenSecretManager
-                        .generateDataEncryptionKey(block.getBlockPoolId()));
-            
-            unbufMirrorOut = encryptedStreams.out;
-            unbufMirrorIn = encryptedStreams.in;
-          }
+          DataEncryptionKeyFactory keyFactory =
+            datanode.getDataEncryptionKeyFactoryForBlock(block);
+          IOStreamPair saslStreams = datanode.saslClient.socketSend(mirrorSock,
+            unbufMirrorOut, unbufMirrorIn, keyFactory, blockToken, targets[0]);
+          unbufMirrorOut = saslStreams.out;
+          unbufMirrorIn = saslStreams.in;
           mirrorOut = new DataOutputStream(new BufferedOutputStream(unbufMirrorOut,
               HdfsConstants.SMALL_BUFFER_SIZE));
           mirrorIn = new DataInputStream(unbufMirrorIn);
 
-          new Sender(mirrorOut).writeBlock(originalBlock, blockToken,
-              clientname, targets, srcDataNode, stage, pipelineSize,
-              minBytesRcvd, maxBytesRcvd, latestGenerationStamp, requestedChecksum,
-              cachingStrategy);
+          new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0],
+              blockToken, clientname, targets, targetStorageTypes, srcDataNode,
+              stage, pipelineSize, minBytesRcvd, maxBytesRcvd,
+              latestGenerationStamp, requestedChecksum, cachingStrategy);
 
           mirrorOut.flush();
 
@@ -789,7 +758,8 @@ class DataXceiver extends Receiver imple
   public void transferBlock(final ExtendedBlock blk,
       final Token<BlockTokenIdentifier> blockToken,
       final String clientName,
-      final DatanodeInfo[] targets) throws IOException {
+      final DatanodeInfo[] targets,
+      final StorageType[] targetStorageTypes) throws IOException {
     checkAccess(socketOut, true, blk, blockToken,
         Op.TRANSFER_BLOCK, BlockTokenSecretManager.AccessMode.COPY);
     previousOpClientName = clientName;
@@ -798,7 +768,8 @@ class DataXceiver extends Receiver imple
     final DataOutputStream out = new DataOutputStream(
         getOutputStream());
     try {
-      datanode.transferReplicaForPipelineRecovery(blk, targets, clientName);
+      datanode.transferReplicaForPipelineRecovery(blk, targets,
+          targetStorageTypes, clientName);
       writeResponse(Status.SUCCESS, null, out);
     } finally {
       IOUtils.closeStream(out);
@@ -976,6 +947,7 @@ class DataXceiver extends Receiver imple
 
   @Override
   public void replaceBlock(final ExtendedBlock block,
+      final StorageType storageType, 
       final Token<BlockTokenIdentifier> blockToken,
       final String delHint,
       final DatanodeInfo proxySource) throws IOException {
@@ -1026,17 +998,12 @@ class DataXceiver extends Receiver imple
       OutputStream unbufProxyOut = NetUtils.getOutputStream(proxySock,
           dnConf.socketWriteTimeout);
       InputStream unbufProxyIn = NetUtils.getInputStream(proxySock);
-      if (dnConf.encryptDataTransfer && 
-          !dnConf.trustedChannelResolver.isTrusted(
-              proxySock.getInetAddress())) {
-        IOStreamPair encryptedStreams =
-            DataTransferEncryptor.getEncryptedStreams(
-                unbufProxyOut, unbufProxyIn,
-                datanode.blockPoolTokenSecretManager
-                    .generateDataEncryptionKey(block.getBlockPoolId()));
-        unbufProxyOut = encryptedStreams.out;
-        unbufProxyIn = encryptedStreams.in;
-      }
+      DataEncryptionKeyFactory keyFactory =
+        datanode.getDataEncryptionKeyFactoryForBlock(block);
+      IOStreamPair saslStreams = datanode.saslClient.socketSend(proxySock,
+        unbufProxyOut, unbufProxyIn, keyFactory, blockToken, proxySource);
+      unbufProxyOut = saslStreams.out;
+      unbufProxyIn = saslStreams.in;
       
       proxyOut = new DataOutputStream(new BufferedOutputStream(unbufProxyOut, 
           HdfsConstants.SMALL_BUFFER_SIZE));
@@ -1066,8 +1033,8 @@ class DataXceiver extends Receiver imple
       DataChecksum remoteChecksum = DataTransferProtoUtil.fromProto(
           checksumInfo.getChecksum());
       // open a block receiver and check if the block does not exist
-      blockReceiver = new BlockReceiver(
-          block, proxyReply, proxySock.getRemoteSocketAddress().toString(),
+      blockReceiver = new BlockReceiver(block, storageType,
+          proxyReply, proxySock.getRemoteSocketAddress().toString(),
           proxySock.getLocalSocketAddress().toString(),
           null, 0, 0, 0, "", null, datanode, remoteChecksum,
           CachingStrategy.newDropBehind());

Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java Fri Jul 25 20:33:09 2014
@@ -28,6 +28,7 @@ import java.util.Map;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.StorageType;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
@@ -176,8 +177,8 @@ public interface FsDatasetSpi<V extends 
    * @return the meta info of the replica which is being written to
    * @throws IOException if an error occurs
    */
-  public ReplicaInPipelineInterface createTemporary(ExtendedBlock b
-      ) throws IOException;
+  public ReplicaInPipelineInterface createTemporary(StorageType storageType,
+      ExtendedBlock b) throws IOException;
 
   /**
    * Creates a RBW replica and returns the meta info of the replica
@@ -186,8 +187,8 @@ public interface FsDatasetSpi<V extends 
    * @return the meta info of the replica which is being written to
    * @throws IOException if an error occurs
    */
-  public ReplicaInPipelineInterface createRbw(ExtendedBlock b
-      ) throws IOException;
+  public ReplicaInPipelineInterface createRbw(StorageType storageType,
+      ExtendedBlock b) throws IOException;
 
   /**
    * Recovers a RBW replica and returns the meta info of the replica

Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java Fri Jul 25 20:33:09 2014
@@ -17,6 +17,28 @@
  */
 package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
 
+import java.io.File;
+import java.io.FileDescriptor;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executor;
+
+import javax.management.NotCompliantMBeanException;
+import javax.management.ObjectName;
+import javax.management.StandardMBean;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -24,12 +46,37 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.StorageType;
-import org.apache.hadoop.hdfs.protocol.*;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
+import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
+import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
 import org.apache.hadoop.hdfs.server.common.GenerationStamp;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
 import org.apache.hadoop.hdfs.server.common.Storage;
-import org.apache.hadoop.hdfs.server.datanode.*;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.*;
+import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
+import org.apache.hadoop.hdfs.server.datanode.DataBlockScanner;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataStorage;
+import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
+import org.apache.hadoop.hdfs.server.datanode.Replica;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaAlreadyExistsException;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipeline;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaUnderRecovery;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaWaitingToBeRecovered;
+import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.RollingLogs;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy;
 import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
@@ -43,15 +90,6 @@ import org.apache.hadoop.util.DiskChecke
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.Time;
 
-import javax.management.NotCompliantMBeanException;
-import javax.management.ObjectName;
-import javax.management.StandardMBean;
-import java.io.*;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.util.*;
-import java.util.concurrent.Executor;
-
 /**************************************************
  * FSDataset manages a set of data blocks.  Each block
  * has a unique name and an extent on disk.
@@ -736,8 +774,8 @@ class FsDatasetImpl implements FsDataset
   }
 
   @Override // FsDatasetSpi
-  public synchronized ReplicaInPipeline createRbw(ExtendedBlock b)
-      throws IOException {
+  public synchronized ReplicaInPipeline createRbw(StorageType storageType,
+      ExtendedBlock b) throws IOException {
     ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), 
         b.getBlockId());
     if (replicaInfo != null) {
@@ -746,7 +784,7 @@ class FsDatasetImpl implements FsDataset
       " and thus cannot be created.");
     }
     // create a new block
-    FsVolumeImpl v = volumes.getNextVolume(b.getNumBytes());
+    FsVolumeImpl v = volumes.getNextVolume(storageType, b.getNumBytes());
     // create a rbw file to hold block in the designated volume
     File f = v.createRbwFile(b.getBlockPoolId(), b.getLocalBlock());
     ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(b.getBlockId(), 
@@ -874,8 +912,8 @@ class FsDatasetImpl implements FsDataset
   }
 
   @Override // FsDatasetSpi
-  public synchronized ReplicaInPipeline createTemporary(ExtendedBlock b)
-      throws IOException {
+  public synchronized ReplicaInPipeline createTemporary(StorageType storageType,
+      ExtendedBlock b) throws IOException {
     ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), b.getBlockId());
     if (replicaInfo != null) {
       throw new ReplicaAlreadyExistsException("Block " + b +
@@ -883,7 +921,7 @@ class FsDatasetImpl implements FsDataset
           " and thus cannot be created.");
     }
     
-    FsVolumeImpl v = volumes.getNextVolume(b.getNumBytes());
+    FsVolumeImpl v = volumes.getNextVolume(storageType, b.getNumBytes());
     // create a temporary file to hold block in the designated volume
     File f = v.createTmpFile(b.getBlockPoolId(), b.getLocalBlock());
     ReplicaInPipeline newReplicaInfo = new ReplicaInPipeline(b.getBlockId(), 

Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java Fri Jul 25 20:33:09 2014
@@ -18,13 +18,17 @@
 package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
 
 import java.io.IOException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.StorageType;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy;
-import org.apache.hadoop.util.Time;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
+import org.apache.hadoop.util.Time;
 
 class FsVolumeList {
   /**
@@ -52,11 +56,18 @@ class FsVolumeList {
    * by a single thread and next volume is chosen with no concurrent
    * update to {@link #volumes}.
    * @param blockSize free space needed on the volume
+   * @param storageType the desired {@link StorageType} 
    * @return next volume to store the block in.
    */
-  // TODO should choose volume with storage type
-  synchronized FsVolumeImpl getNextVolume(long blockSize) throws IOException {
-    return blockChooser.chooseVolume(volumes, blockSize);
+  synchronized FsVolumeImpl getNextVolume(StorageType storageType,
+      long blockSize) throws IOException {
+    final List<FsVolumeImpl> list = new ArrayList<FsVolumeImpl>(volumes.size());
+    for(FsVolumeImpl v : volumes) {
+      if (v.getStorageType() == storageType) {
+        list.add(v);
+      }
+    }
+    return blockChooser.chooseVolume(list, blockSize);
   }
     
   long getDfsUsed() throws IOException {

Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java Fri Jul 25 20:33:09 2014
@@ -128,7 +128,8 @@ public class DatanodeWebHdfsMethods {
             "://" + nnId);
     boolean isLogical = HAUtil.isLogicalUri(conf, nnUri);
     if (isLogical) {
-      token.setService(HAUtil.buildTokenServiceForLogicalUri(nnUri));
+      token.setService(HAUtil.buildTokenServiceForLogicalUri(nnUri,
+          HdfsConstants.HDFS_URI_SCHEME));
     } else {
       token.setService(SecurityUtil.buildTokenService(nnUri));
     }

Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java Fri Jul 25 20:33:09 2014
@@ -48,6 +48,15 @@ public interface FSClusterStats {
    * @return Number of datanodes that are both alive and not decommissioned.
    */
   public int getNumDatanodesInService();
+
+  /**
+   * an indication of the average load of non-decommission(ing|ed) nodes
+   * eligible for block placement
+   * 
+   * @return average of the in service number of block transfers and block
+   *         writes that are currently occurring on the cluster.
+   */
+  public double getInServiceXceiverAverage();
 }
     
     

Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java Fri Jul 25 20:33:09 2014
@@ -1074,10 +1074,11 @@ public class FSEditLog implements LogsPu
     logEdit(op);
   }
   
-  void logRemoveXAttrs(String src, List<XAttr> xAttrs) {
+  void logRemoveXAttrs(String src, List<XAttr> xAttrs, boolean toLogRpcIds) {
     final RemoveXAttrOp op = RemoveXAttrOp.getInstance();
     op.src = src;
     op.xAttrs = xAttrs;
+    logRpcIds(op, toLogRpcIds);
     logEdit(op);
   }
 

Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java Fri Jul 25 20:33:09 2014
@@ -821,6 +821,10 @@ public class FSEditLogLoader {
       RemoveXAttrOp removeXAttrOp = (RemoveXAttrOp) op;
       fsDir.unprotectedRemoveXAttrs(removeXAttrOp.src,
           removeXAttrOp.xAttrs);
+      if (toAddRetryCache) {
+        fsNamesys.addCacheEntry(removeXAttrOp.rpcClientId,
+            removeXAttrOp.rpcCallId);
+      }
       break;
     }
     default:

Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java Fri Jul 25 20:33:09 2014
@@ -3551,6 +3551,7 @@ public abstract class FSEditLogOp {
       XAttrEditLogProto p = XAttrEditLogProto.parseDelimitedFrom(in);
       src = p.getSrc();
       xAttrs = PBHelper.convertXAttrs(p.getXAttrsList());
+      readRpcIds(in, logVersion);
     }
 
     @Override
@@ -3561,18 +3562,22 @@ public abstract class FSEditLogOp {
       }
       b.addAllXAttrs(PBHelper.convertXAttrProto(xAttrs));
       b.build().writeDelimitedTo(out);
+      // clientId and callId
+      writeRpcIds(rpcClientId, rpcCallId, out);
     }
 
     @Override
     protected void toXml(ContentHandler contentHandler) throws SAXException {
       XMLUtils.addSaxString(contentHandler, "SRC", src);
       appendXAttrsToXml(contentHandler, xAttrs);
+      appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
     }
 
     @Override
     void fromXml(Stanza st) throws InvalidXmlException {
       src = st.getValue("SRC");
       xAttrs = readXAttrsFromXml(st);
+      readRpcIdsFromXml(st);
     }
   }
   

Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java Fri Jul 25 20:33:09 2014
@@ -225,6 +225,7 @@ public class FSImage implements Closeabl
       NNStorage.checkVersionUpgradable(storage.getLayoutVersion());
     }
     if (startOpt != StartupOption.UPGRADE
+        && startOpt != StartupOption.UPGRADEONLY
         && !RollingUpgradeStartupOption.STARTED.matches(startOpt)
         && layoutVersion < Storage.LAST_PRE_UPGRADE_LAYOUT_VERSION
         && layoutVersion != HdfsConstants.NAMENODE_LAYOUT_VERSION) {
@@ -263,6 +264,7 @@ public class FSImage implements Closeabl
     // 3. Do transitions
     switch(startOpt) {
     case UPGRADE:
+    case UPGRADEONLY:
       doUpgrade(target);
       return false; // upgrade saved image already
     case IMPORT:
@@ -748,11 +750,13 @@ public class FSImage implements Closeabl
       editLog.recoverUnclosedStreams();
     } else if (HAUtil.isHAEnabled(conf, nameserviceId)
         && (startOpt == StartupOption.UPGRADE
+            || startOpt == StartupOption.UPGRADEONLY
             || RollingUpgradeStartupOption.ROLLBACK.matches(startOpt))) {
       // This NN is HA, but we're doing an upgrade or a rollback of rolling
       // upgrade so init the edit log for write.
       editLog.initJournalsForWrite();
-      if (startOpt == StartupOption.UPGRADE) {
+      if (startOpt == StartupOption.UPGRADE
+          || startOpt == StartupOption.UPGRADEONLY) {
         long sharedLogCTime = editLog.getSharedLogCTime();
         if (this.storage.getCTime() < sharedLogCTime) {
           throw new IOException("It looks like the shared log is already " +

Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java Fri Jul 25 20:33:09 2014
@@ -614,6 +614,16 @@ public class FSImageFormat {
     INodeDirectory parentINode = fsDir.rootDir;
     for (long i = 0; i < numFiles; i++) {
       pathComponents = FSImageSerialization.readPathComponents(in);
+      for (int j=0; j < pathComponents.length; j++) {
+        byte[] newComponent = renameReservedComponentOnUpgrade
+            (pathComponents[j], getLayoutVersion());
+        if (!Arrays.equals(newComponent, pathComponents[j])) {
+          String oldPath = DFSUtil.byteArray2PathString(pathComponents);
+          pathComponents[j] = newComponent;
+          String newPath = DFSUtil.byteArray2PathString(pathComponents);
+          LOG.info("Renaming reserved path " + oldPath + " to " + newPath);
+        }
+      }
       final INode newNode = loadINode(
           pathComponents[pathComponents.length-1], false, in, counter);
 
@@ -926,6 +936,7 @@ public class FSImageFormat {
           oldnode = namesystem.dir.getInode(cons.getId()).asFile();
           inSnapshot = true;
         } else {
+          path = renameReservedPathsOnUpgrade(path, getLayoutVersion());
           final INodesInPath iip = fsDir.getLastINodeInPath(path);
           oldnode = INodeFile.valueOf(iip.getINode(0), path);
         }

Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Fri Jul 25 20:33:09 2014
@@ -83,6 +83,9 @@ import static org.apache.hadoop.hdfs.DFS
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RANDOMIZE_BLOCK_LOCATIONS_PER_BLOCK;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RANDOMIZE_BLOCK_LOCATIONS_PER_BLOCK_DEFAULT;
+
 import static org.apache.hadoop.util.Time.now;
 
 import java.io.BufferedWriter;
@@ -528,6 +531,8 @@ public class FSNamesystem implements Nam
 
   private final FSImage fsImage;
 
+  private boolean randomizeBlockLocationsPerBlock;
+
   /**
    * Notify that loading of this FSDirectory is complete, and
    * it is imageLoaded for use
@@ -837,6 +842,10 @@ public class FSNamesystem implements Nam
       alwaysUseDelegationTokensForTests = conf.getBoolean(
           DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY,
           DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_DEFAULT);
+      
+      this.randomizeBlockLocationsPerBlock = conf.getBoolean(
+          DFS_NAMENODE_RANDOMIZE_BLOCK_LOCATIONS_PER_BLOCK,
+          DFS_NAMENODE_RANDOMIZE_BLOCK_LOCATIONS_PER_BLOCK_DEFAULT);
 
       this.dtSecretManager = createDelegationTokenSecretManager(conf);
       this.dir = new FSDirectory(this, conf);
@@ -979,7 +988,8 @@ public class FSNamesystem implements Nam
       }
       // This will start a new log segment and write to the seen_txid file, so
       // we shouldn't do it when coming up in standby state
-      if (!haEnabled || (haEnabled && startOpt == StartupOption.UPGRADE)) {
+      if (!haEnabled || (haEnabled && startOpt == StartupOption.UPGRADE)
+          || (haEnabled && startOpt == StartupOption.UPGRADEONLY)) {
         fsImage.openEditLogForWrite();
       }
       success = true;
@@ -1699,17 +1709,17 @@ public class FSNamesystem implements Nam
     LocatedBlocks blocks = getBlockLocations(src, offset, length, true, true,
         true);
     if (blocks != null) {
-      blockManager.getDatanodeManager().sortLocatedBlocks(
-          clientMachine, blocks.getLocatedBlocks());
-      
+      blockManager.getDatanodeManager().sortLocatedBlocks(clientMachine,
+          blocks.getLocatedBlocks(), randomizeBlockLocationsPerBlock);
+
       // lastBlock is not part of getLocatedBlocks(), might need to sort it too
       LocatedBlock lastBlock = blocks.getLastLocatedBlock();
       if (lastBlock != null) {
         ArrayList<LocatedBlock> lastBlockList =
             Lists.newArrayListWithCapacity(1);
         lastBlockList.add(lastBlock);
-        blockManager.getDatanodeManager().sortLocatedBlocks(
-                              clientMachine, lastBlockList);
+        blockManager.getDatanodeManager().sortLocatedBlocks(clientMachine,
+            lastBlockList, randomizeBlockLocationsPerBlock);
       }
     }
     return blocks;
@@ -7319,7 +7329,18 @@ public class FSNamesystem implements Nam
 
   @Override // FSClusterStats
   public int getNumDatanodesInService() {
-    return getNumLiveDataNodes() - getNumDecomLiveDataNodes();
+    return datanodeStatistics.getNumDatanodesInService();
+  }
+  
+  @Override // for block placement strategy
+  public double getInServiceXceiverAverage() {
+    double avgLoad = 0;
+    final int nodes = getNumDatanodesInService();
+    if (nodes != 0) {
+      final int xceivers = datanodeStatistics.getInServiceXceiverCount();
+      avgLoad = (double)xceivers/nodes;
+    }
+    return avgLoad;
   }
 
   public SnapshotManager getSnapshotManager() {
@@ -8258,11 +8279,12 @@ public class FSNamesystem implements Nam
     nnConf.checkXAttrsConfigFlag();
     FSPermissionChecker pc = getPermissionChecker();
     boolean getAll = xAttrs == null || xAttrs.isEmpty();
-    List<XAttr> filteredXAttrs = null;
     if (!getAll) {
-      filteredXAttrs = XAttrPermissionFilter.filterXAttrsForApi(pc, xAttrs);
-      if (filteredXAttrs.isEmpty()) {
-        return filteredXAttrs;
+      try {
+        XAttrPermissionFilter.checkPermissionForApi(pc, xAttrs);
+      } catch (AccessControlException e) {
+        logAuditEvent(false, "getXAttrs", src);
+        throw e;
       }
     }
     checkOperation(OperationCategory.READ);
@@ -8281,15 +8303,21 @@ public class FSNamesystem implements Nam
         if (filteredAll == null || filteredAll.isEmpty()) {
           return null;
         }
-        List<XAttr> toGet = Lists.newArrayListWithCapacity(filteredXAttrs.size());
-        for (XAttr xAttr : filteredXAttrs) {
+        List<XAttr> toGet = Lists.newArrayListWithCapacity(xAttrs.size());
+        for (XAttr xAttr : xAttrs) {
+          boolean foundIt = false;
           for (XAttr a : filteredAll) {
             if (xAttr.getNameSpace() == a.getNameSpace()
                 && xAttr.getName().equals(a.getName())) {
               toGet.add(a);
+              foundIt = true;
               break;
             }
           }
+          if (!foundIt) {
+            throw new IOException(
+                "At least one of the attributes provided was not found.");
+          }
         }
         return toGet;
       }
@@ -8323,17 +8351,42 @@ public class FSNamesystem implements Nam
       readUnlock();
     }
   }
-  
+
+  /**
+   * Remove an xattr for a file or directory.
+   *
+   * @param src
+   *          - path to remove the xattr from
+   * @param xAttr
+   *          - xAttr to remove
+   * @throws AccessControlException
+   * @throws SafeModeException
+   * @throws UnresolvedLinkException
+   * @throws IOException
+   */
   void removeXAttr(String src, XAttr xAttr) throws IOException {
-    nnConf.checkXAttrsConfigFlag();
-    HdfsFileStatus resultingStat = null;
-    FSPermissionChecker pc = getPermissionChecker();
+    CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+    if (cacheEntry != null && cacheEntry.isSuccess()) {
+      return; // Return previous response
+    }
+    boolean success = false;
     try {
-      XAttrPermissionFilter.checkPermissionForApi(pc, xAttr);
+      removeXAttrInt(src, xAttr, cacheEntry != null);
+      success = true;
     } catch (AccessControlException e) {
       logAuditEvent(false, "removeXAttr", src);
       throw e;
+    } finally {
+      RetryCache.setState(cacheEntry, success);
     }
+  }
+
+  void removeXAttrInt(String src, XAttr xAttr, boolean logRetryCache)
+      throws IOException {
+    nnConf.checkXAttrsConfigFlag();
+    HdfsFileStatus resultingStat = null;
+    FSPermissionChecker pc = getPermissionChecker();
+      XAttrPermissionFilter.checkPermissionForApi(pc, xAttr);
     checkOperation(OperationCategory.WRITE);
     byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
     writeLock();
@@ -8347,12 +8400,12 @@ public class FSNamesystem implements Nam
       xAttrs.add(xAttr);
       List<XAttr> removedXAttrs = dir.removeXAttrs(src, xAttrs);
       if (removedXAttrs != null && !removedXAttrs.isEmpty()) {
-        getEditLog().logRemoveXAttrs(src, removedXAttrs);
+        getEditLog().logRemoveXAttrs(src, removedXAttrs, logRetryCache);
+      } else {
+        throw new IOException(
+            "No matching attributes found for remove operation");
       }
       resultingStat = getAuditFileInfo(src, false);
-    } catch (AccessControlException e) {
-      logAuditEvent(false, "removeXAttr", src);
-      throw e;
     } finally {
       writeUnlock();
     }

Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java Fri Jul 25 20:33:09 2014
@@ -71,6 +71,8 @@ public class FileJournalManager implemen
     NameNodeFile.EDITS.getName() + "_(\\d+)-(\\d+)");
   private static final Pattern EDITS_INPROGRESS_REGEX = Pattern.compile(
     NameNodeFile.EDITS_INPROGRESS.getName() + "_(\\d+)");
+  private static final Pattern EDITS_INPROGRESS_STALE_REGEX = Pattern.compile(
+      NameNodeFile.EDITS_INPROGRESS.getName() + "_(\\d+).*(\\S+)");
 
   private File currentInProgress = null;
 
@@ -162,8 +164,7 @@ public class FileJournalManager implemen
       throws IOException {
     LOG.info("Purging logs older than " + minTxIdToKeep);
     File[] files = FileUtil.listFiles(sd.getCurrentDir());
-    List<EditLogFile> editLogs = 
-      FileJournalManager.matchEditLogs(files);
+    List<EditLogFile> editLogs = matchEditLogs(files, true);
     for (EditLogFile log : editLogs) {
       if (log.getFirstTxId() < minTxIdToKeep &&
           log.getLastTxId() < minTxIdToKeep) {
@@ -244,8 +245,13 @@ public class FileJournalManager implemen
   public static List<EditLogFile> matchEditLogs(File logDir) throws IOException {
     return matchEditLogs(FileUtil.listFiles(logDir));
   }
-  
+
   static List<EditLogFile> matchEditLogs(File[] filesInStorage) {
+    return matchEditLogs(filesInStorage, false);
+  }
+
+  private static List<EditLogFile> matchEditLogs(File[] filesInStorage,
+      boolean forPurging) {
     List<EditLogFile> ret = Lists.newArrayList();
     for (File f : filesInStorage) {
       String name = f.getName();
@@ -256,6 +262,7 @@ public class FileJournalManager implemen
           long startTxId = Long.parseLong(editsMatch.group(1));
           long endTxId = Long.parseLong(editsMatch.group(2));
           ret.add(new EditLogFile(f, startTxId, endTxId));
+          continue;
         } catch (NumberFormatException nfe) {
           LOG.error("Edits file " + f + " has improperly formatted " +
                     "transaction ID");
@@ -270,12 +277,30 @@ public class FileJournalManager implemen
           long startTxId = Long.parseLong(inProgressEditsMatch.group(1));
           ret.add(
               new EditLogFile(f, startTxId, HdfsConstants.INVALID_TXID, true));
+          continue;
         } catch (NumberFormatException nfe) {
           LOG.error("In-progress edits file " + f + " has improperly " +
                     "formatted transaction ID");
           // skip
         }
       }
+      if (forPurging) {
+        // Check for in-progress stale edits
+        Matcher staleInprogressEditsMatch = EDITS_INPROGRESS_STALE_REGEX
+            .matcher(name);
+        if (staleInprogressEditsMatch.matches()) {
+          try {
+            long startTxId = Long.valueOf(staleInprogressEditsMatch.group(1));
+            ret.add(new EditLogFile(f, startTxId, HdfsConstants.INVALID_TXID,
+                true));
+            continue;
+          } catch (NumberFormatException nfe) {
+            LOG.error("In-progress stale edits file " + f + " has improperly "
+                + "formatted transaction ID");
+            // skip
+          }
+        }
+      }
     }
     return ret;
   }

Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java Fri Jul 25 20:33:09 2014
@@ -836,7 +836,7 @@ public class NNStorage extends Storage i
    */
   void processStartupOptionsForUpgrade(StartupOption startOpt, int layoutVersion)
       throws IOException {
-    if (startOpt == StartupOption.UPGRADE) {
+    if (startOpt == StartupOption.UPGRADE || startOpt == StartupOption.UPGRADEONLY) {
       // If upgrade from a release that does not support federation,
       // if clusterId is provided in the startupOptions use it.
       // Else generate a new cluster ID      

Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java Fri Jul 25 20:33:09 2014
@@ -210,6 +210,9 @@ public class NameNode implements NameNod
       + StartupOption.UPGRADE.getName() + 
         " [" + StartupOption.CLUSTERID.getName() + " cid]" +
         " [" + StartupOption.RENAMERESERVED.getName() + "<k-v pairs>] ] | \n\t["
+      + StartupOption.UPGRADEONLY.getName() + 
+        " [" + StartupOption.CLUSTERID.getName() + " cid]" +
+        " [" + StartupOption.RENAMERESERVED.getName() + "<k-v pairs>] ] | \n\t["
       + StartupOption.ROLLBACK.getName() + "] | \n\t["
       + StartupOption.ROLLINGUPGRADE.getName() + " <"
       + RollingUpgradeStartupOption.DOWNGRADE.name().toLowerCase() + "|"
@@ -713,6 +716,7 @@ public class NameNode implements NameNod
    * <li>{@link StartupOption#BACKUP BACKUP} - start backup node</li>
    * <li>{@link StartupOption#CHECKPOINT CHECKPOINT} - start checkpoint node</li>
    * <li>{@link StartupOption#UPGRADE UPGRADE} - start the cluster  
+   * <li>{@link StartupOption#UPGRADEONLY UPGRADEONLY} - upgrade the cluster  
    * upgrade and create a snapshot of the current file system state</li> 
    * <li>{@link StartupOption#RECOVER RECOVERY} - recover name node
    * metadata</li>
@@ -767,7 +771,8 @@ public class NameNode implements NameNod
   }
 
   protected HAState createHAState(StartupOption startOpt) {
-    if (!haEnabled || startOpt == StartupOption.UPGRADE) {
+    if (!haEnabled || startOpt == StartupOption.UPGRADE 
+        || startOpt == StartupOption.UPGRADEONLY) {
       return ACTIVE_STATE;
     } else {
       return STANDBY_STATE;
@@ -1198,8 +1203,10 @@ public class NameNode implements NameNod
         startOpt = StartupOption.BACKUP;
       } else if (StartupOption.CHECKPOINT.getName().equalsIgnoreCase(cmd)) {
         startOpt = StartupOption.CHECKPOINT;
-      } else if (StartupOption.UPGRADE.getName().equalsIgnoreCase(cmd)) {
-        startOpt = StartupOption.UPGRADE;
+      } else if (StartupOption.UPGRADE.getName().equalsIgnoreCase(cmd)
+          || StartupOption.UPGRADEONLY.getName().equalsIgnoreCase(cmd)) {
+        startOpt = StartupOption.UPGRADE.getName().equalsIgnoreCase(cmd) ? 
+            StartupOption.UPGRADE : StartupOption.UPGRADEONLY;
         /* Can be followed by CLUSTERID with a required parameter or
          * RENAMERESERVED with an optional parameter
          */
@@ -1407,6 +1414,12 @@ public class NameNode implements NameNod
         terminate(0);
         return null; // avoid javac warning
       }
+      case UPGRADEONLY: {
+        DefaultMetricsSystem.initialize("NameNode");
+        new NameNode(conf);
+        terminate(0);
+        return null;
+      }
       default: {
         DefaultMetricsSystem.initialize("NameNode");
         return new NameNode(conf);

Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java Fri Jul 25 20:33:09 2014
@@ -17,6 +17,9 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
+import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT;
+import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY;
+
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.OutputStream;
@@ -47,6 +50,7 @@ import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.RemotePeerFactory;
 import org.apache.hadoop.hdfs.net.Peer;
 import org.apache.hadoop.hdfs.net.TcpPeerServer;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DirectoryListing;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -55,6 +59,12 @@ import org.apache.hadoop.hdfs.protocol.H
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
+import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementStatus;
 import org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas;
@@ -65,6 +75,7 @@ import org.apache.hadoop.net.NetworkTopo
 import org.apache.hadoop.net.NodeBase;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.Time;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -92,7 +103,7 @@ import com.google.common.annotations.Vis
  *  factors of each file.
  */
 @InterfaceAudience.Private
-public class NamenodeFsck {
+public class NamenodeFsck implements DataEncryptionKeyFactory {
   public static final Log LOG = LogFactory.getLog(NameNode.class.getName());
   
   // return string marking fsck status
@@ -115,6 +126,7 @@ public class NamenodeFsck {
   private boolean showBlocks = false;
   private boolean showLocations = false;
   private boolean showRacks = false;
+  private boolean showprogress = false;
   private boolean showCorruptFileBlocks = false;
 
   /**
@@ -149,6 +161,7 @@ public class NamenodeFsck {
   private List<String> snapshottableDirs = null;
 
   private final BlockPlacementPolicy bpPolicy;
+  private final SaslDataTransferClient saslClient;
 
   /**
    * Filesystem checker.
@@ -175,6 +188,12 @@ public class NamenodeFsck {
         networktopology,
         namenode.getNamesystem().getBlockManager().getDatanodeManager()
         .getHost2DatanodeMap());
+    this.saslClient = new SaslDataTransferClient(
+      DataTransferSaslUtil.getSaslPropertiesResolver(conf),
+      TrustedChannelResolver.getInstance(conf),
+      conf.getBoolean(
+        IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
+        IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT));
     
     for (Iterator<String> it = pmap.keySet().iterator(); it.hasNext();) {
       String key = it.next();
@@ -185,6 +204,7 @@ public class NamenodeFsck {
       else if (key.equals("blocks")) { this.showBlocks = true; }
       else if (key.equals("locations")) { this.showLocations = true; }
       else if (key.equals("racks")) { this.showRacks = true; }
+      else if (key.equals("showprogress")) { this.showprogress = true; }
       else if (key.equals("openforwrite")) {this.showOpenFiles = true; }
       else if (key.equals("listcorruptfileblocks")) {
         this.showCorruptFileBlocks = true;
@@ -363,10 +383,13 @@ public class NamenodeFsck {
     } else if (showFiles) {
       out.print(path + " " + fileLen + " bytes, " +
         blocks.locatedBlockCount() + " block(s): ");
-    } else {
+    } else if (showprogress) {
       out.print('.');
     }
-    if (res.totalFiles % 100 == 0) { out.println(); out.flush(); }
+    if ((showprogress) && res.totalFiles % 100 == 0) {
+      out.println();
+      out.flush();
+    }
     int missing = 0;
     int corrupt = 0;
     long missize = 0;
@@ -616,15 +639,16 @@ public class NamenodeFsck {
             setConfiguration(namenode.conf).
             setRemotePeerFactory(new RemotePeerFactory() {
               @Override
-              public Peer newConnectedPeer(InetSocketAddress addr)
+              public Peer newConnectedPeer(InetSocketAddress addr,
+                  Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId)
                   throws IOException {
                 Peer peer = null;
                 Socket s = NetUtils.getDefaultSocketFactory(conf).createSocket();
                 try {
                   s.connect(addr, HdfsServerConstants.READ_TIMEOUT);
                   s.setSoTimeout(HdfsServerConstants.READ_TIMEOUT);
-                  peer = TcpPeerServer.peerFromSocketAndKey(s, namenode.getRpcServer().
-                        getDataEncryptionKey());
+                  peer = TcpPeerServer.peerFromSocketAndKey(saslClient, s,
+                        NamenodeFsck.this, blockToken, datanodeId);
                 } finally {
                   if (peer == null) {
                     IOUtils.closeQuietly(s);
@@ -663,7 +687,12 @@ public class NamenodeFsck {
       throw new Exception("Could not copy block data for " + lblock.getBlock());
     }
   }
-      
+
+  @Override
+  public DataEncryptionKey newDataEncryptionKey() throws IOException {
+    return namenode.getRpcServer().getDataEncryptionKey();
+  }
+
   /*
    * XXX (ab) See comment above for copyBlock().
    *

Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/XAttrPermissionFilter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/XAttrPermissionFilter.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/XAttrPermissionFilter.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/XAttrPermissionFilter.java Fri Jul 25 20:33:09 2014
@@ -26,6 +26,7 @@ import org.apache.hadoop.hdfs.XAttrHelpe
 import org.apache.hadoop.security.AccessControlException;
 
 import com.google.common.collect.Lists;
+import com.google.common.base.Preconditions;
 
 /**
  * There are four types of extended attributes <XAttr> defined by the
@@ -60,8 +61,20 @@ public class XAttrPermissionFilter {
     throw new AccessControlException("User doesn't have permission for xattr: "
         + XAttrHelper.getPrefixName(xAttr));
   }
-  
-  static List<XAttr> filterXAttrsForApi(FSPermissionChecker pc, 
+
+  static void checkPermissionForApi(FSPermissionChecker pc,
+                                    List<XAttr> xAttrs) throws AccessControlException {
+    Preconditions.checkArgument(xAttrs != null);
+    if (xAttrs.isEmpty()) {
+      return;
+    }
+
+    for (XAttr xAttr : xAttrs) {
+      checkPermissionForApi(pc, xAttr);
+    }
+  }
+
+  static List<XAttr> filterXAttrsForApi(FSPermissionChecker pc,
       List<XAttr> xAttrs) {
     assert xAttrs != null : "xAttrs can not be null";
     if (xAttrs == null || xAttrs.isEmpty()) {

Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/XAttrStorage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/XAttrStorage.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/XAttrStorage.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/XAttrStorage.java Fri Jul 25 20:33:09 2014
@@ -19,24 +19,30 @@
 package org.apache.hadoop.hdfs.server.namenode;
 
 import java.util.List;
+import java.util.Map;
 
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.XAttr;
 import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
-import org.apache.hadoop.hdfs.server.namenode.INode;
-
-import com.google.common.collect.ImmutableList;
 
 /**
  * XAttrStorage is used to read and set xattrs for an inode.
  */
 @InterfaceAudience.Private
 public class XAttrStorage {
-  
+
+  private static final Map<String, String> internedNames = Maps.newHashMap();
+
   /**
    * Reads the existing extended attributes of an inode. If the 
    * inode does not have an <code>XAttr</code>, then this method
    * returns an empty list.
+   * <p/>
+   * Must be called while holding the FSDirectory read lock.
+   *
    * @param inode INode to read
    * @param snapshotId
    * @return List<XAttr> <code>XAttr</code> list. 
@@ -48,6 +54,9 @@ public class XAttrStorage {
   
   /**
    * Reads the existing extended attributes of an inode.
+   * <p/>
+   * Must be called while holding the FSDirectory read lock.
+   *
    * @param inode INode to read.
    * @return List<XAttr> <code>XAttr</code> list.
    */
@@ -58,6 +67,9 @@ public class XAttrStorage {
   
   /**
    * Update xattrs of inode.
+   * <p/>
+   * Must be called while holding the FSDirectory write lock.
+   * 
    * @param inode INode to update
    * @param xAttrs to update xAttrs.
    * @param snapshotId id of the latest snapshot of the inode
@@ -70,8 +82,24 @@ public class XAttrStorage {
       }
       return;
     }
-    
-    ImmutableList<XAttr> newXAttrs = ImmutableList.copyOf(xAttrs);
+    // Dedupe the xAttr name and save them into a new interned list
+    List<XAttr> internedXAttrs = Lists.newArrayListWithCapacity(xAttrs.size());
+    for (XAttr xAttr : xAttrs) {
+      final String name = xAttr.getName();
+      String internedName = internedNames.get(name);
+      if (internedName == null) {
+        internedName = name;
+        internedNames.put(internedName, internedName);
+      }
+      XAttr internedXAttr = new XAttr.Builder()
+          .setName(internedName)
+          .setNameSpace(xAttr.getNameSpace())
+          .setValue(xAttr.getValue())
+          .build();
+      internedXAttrs.add(internedXAttr);
+    }
+    // Save the list of interned xattrs
+    ImmutableList<XAttr> newXAttrs = ImmutableList.copyOf(internedXAttrs);
     if (inode.getXAttrFeature() != null) {
       inode.removeXAttrFeature(snapshotId);
     }

Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/BootstrapStandby.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/BootstrapStandby.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/BootstrapStandby.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/BootstrapStandby.java Fri Jul 25 20:33:09 2014
@@ -81,6 +81,7 @@ public class BootstrapStandby implements
   
   private boolean force = false;
   private boolean interactive = true;
+  private boolean skipSharedEditsCheck = false;
 
   // Exit/return codes.
   static final int ERR_CODE_FAILED_CONNECT = 2;
@@ -117,6 +118,8 @@ public class BootstrapStandby implements
         force = true;
       } else if ("-nonInteractive".equals(arg)) {
         interactive = false;
+      } else if ("-skipSharedEditsCheck".equals(arg)) {
+        skipSharedEditsCheck = true;
       } else {
         printUsage();
         throw new HadoopIllegalArgumentException(
@@ -127,7 +130,7 @@ public class BootstrapStandby implements
 
   private void printUsage() {
     System.err.println("Usage: " + this.getClass().getSimpleName() +
-        "[-force] [-nonInteractive]");
+        " [-force] [-nonInteractive] [-skipSharedEditsCheck]");
   }
   
   private NamenodeProtocol createNNProtocolProxy()
@@ -200,7 +203,7 @@ public class BootstrapStandby implements
 
       // Ensure that we have enough edits already in the shared directory to
       // start up from the last checkpoint on the active.
-      if (!checkLogsAvailableForRead(image, imageTxId, curTxId)) {
+      if (!skipSharedEditsCheck && !checkLogsAvailableForRead(image, imageTxId, curTxId)) {
         return ERR_CODE_LOGS_UNAVAILABLE;
       }