You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by vi...@apache.org on 2013/10/30 23:22:22 UTC

svn commit: r1537330 [4/11] - in /hadoop/common/branches/YARN-321/hadoop-hdfs-project: ./ hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/server/ hadoop-hdfs-nfs/ hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/ hadoop-hdfs-nfs/sr...

Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/CMakeLists.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/CMakeLists.txt?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/CMakeLists.txt (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/CMakeLists.txt Wed Oct 30 22:21:59 2013
@@ -62,6 +62,11 @@ endfunction()
 INCLUDE(CheckCSourceCompiles)
 CHECK_C_SOURCE_COMPILES("int main(void) { static __thread int i = 0; return 0; }" HAVE_BETTER_TLS)
 
+# Check if we need to link dl library to get dlopen.
+# dlopen on Linux is in separate library but on FreeBSD its in libc
+INCLUDE(CheckLibraryExists)
+CHECK_LIBRARY_EXISTS(dl dlopen "" NEED_LINK_DL)
+
 find_package(JNI REQUIRED)
 if (NOT GENERATED_JAVAH)
     # Must identify where the generated headers have been placed
@@ -89,9 +94,13 @@ add_dual_library(hdfs
     main/native/libhdfs/jni_helper.c
     main/native/libhdfs/hdfs.c
 )
+if (NEED_LINK_DL)
+   set(LIB_DL dl)
+endif(NEED_LINK_DL)
+
 target_link_dual_libraries(hdfs
     ${JAVA_JVM_LIBRARY}
-    dl
+    ${LIB_DL}
     pthread
 )
 dual_output_directory(hdfs target/usr/local/lib)
@@ -142,6 +151,7 @@ target_link_libraries(test_native_mini_d
 )
 
 add_executable(test_libhdfs_threaded
+    main/native/libhdfs/expect.c
     main/native/libhdfs/test_libhdfs_threaded.c
 )
 target_link_libraries(test_libhdfs_threaded
@@ -150,6 +160,16 @@ target_link_libraries(test_libhdfs_threa
     pthread
 )
 
+add_executable(test_libhdfs_zerocopy
+    main/native/libhdfs/expect.c
+    main/native/libhdfs/test/test_libhdfs_zerocopy.c
+)
+target_link_libraries(test_libhdfs_zerocopy
+    hdfs
+    native_mini_dfs
+    pthread
+)
+
 IF(REQUIRE_LIBWEBHDFS)
     add_subdirectory(contrib/libwebhdfs)
 ENDIF(REQUIRE_LIBWEBHDFS)

Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/pom.xml?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/pom.xml (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/pom.xml Wed Oct 30 22:21:59 2013
@@ -36,8 +36,26 @@ http://maven.apache.org/xsd/maven-4.0.0.
     <hadoop.common.build.dir>${basedir}/../../../../../hadoop-common-project/hadoop-common/target</hadoop.common.build.dir>
   </properties>
 
+  <dependencyManagement>
+    <dependencies>
+      <!-- This is a really old version of netty, that gets privatized
+           via shading and hence it is not managed via a parent pom -->
+      <dependency>
+        <groupId>org.jboss.netty</groupId>
+        <artifactId>netty</artifactId>
+        <version>3.2.4.Final</version>
+      </dependency>
+    </dependencies>
+  </dependencyManagement>
+
   <dependencies>
     <dependency>
+      <groupId>org.jboss.netty</groupId>
+      <artifactId>netty</artifactId>
+      <scope>compile</scope>
+    </dependency>
+
+    <dependency>
       <groupId>commons-logging</groupId>
       <artifactId>commons-logging</artifactId>
       <scope>compile</scope>

Propchange: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs-config.cmd
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs.cmd
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs.cmd?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs.cmd (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs.cmd Wed Oct 30 22:21:59 2013
@@ -47,7 +47,17 @@ if "%1" == "--config" (
       goto print_usage
   )
 
-  call :%hdfs-command% %hdfs-command-arguments%
+  set hdfscommands=dfs namenode secondarynamenode journalnode zkfc datanode dfsadmin haadmin fsck balancer jmxget oiv oev fetchdt getconf groups snapshotDiff lsSnapshottableDir
+  for %%i in ( %hdfscommands% ) do (
+    if %hdfs-command% == %%i set hdfscommand=true
+  )
+  if defined hdfscommand (
+    call :%hdfs-command%
+  ) else (
+    set CLASSPATH=%CLASSPATH%;%CD%
+    set CLASS=%hdfs-command%
+  )
+
   set java_arguments=%JAVA_HEAP_MAX% %HADOOP_OPTS% -classpath %CLASSPATH% %CLASS% %hdfs-command-arguments%
   call %JAVA% %java_arguments%
 
@@ -58,6 +68,11 @@ goto :eof
   set HADOOP_OPTS=%HADOOP_OPTS% %HADOOP_NAMENODE_OPTS%
   goto :eof
 
+:journalnode
+  set CLASS=org.apache.hadoop.hdfs.qjournal.server.JournalNode
+  set HADOOP_OPTS=%HADOOP_OPTS% %HADOOP_JOURNALNODE_OPTS%
+  goto :eof
+
 :zkfc
   set CLASS=org.apache.hadoop.hdfs.tools.DFSZKFailoverController
   set HADOOP_OPTS=%HADOOP_OPTS% %HADOOP_ZKFC_OPTS%
@@ -123,6 +138,14 @@ goto :eof
   set CLASS=org.apache.hadoop.hdfs.tools.GetGroups
   goto :eof
 
+:snapshotDiff
+  set CLASS=org.apache.hadoop.hdfs.tools.snapshot.SnapshotDiff
+  goto :eof
+
+:lsSnapshottableDir
+  set CLASS=org.apache.hadoop.hdfs.tools.snapshot.LsSnapshottableDir
+  goto :eof
+
 @rem This changes %1, %2 etc. Hence those cannot be used after calling this.
 :make_command_arguments
   if "%1" == "--config" (
@@ -153,9 +176,11 @@ goto :eof
   @echo   namenode -format     format the DFS filesystem
   @echo   secondarynamenode    run the DFS secondary namenode
   @echo   namenode             run the DFS namenode
+  @echo   journalnode          run the DFS journalnode
   @echo   zkfc                 run the ZK Failover Controller daemon
   @echo   datanode             run a DFS datanode
   @echo   dfsadmin             run a DFS admin client
+  @echo   haadmin              run a DFS HA admin client
   @echo   fsck                 run a DFS filesystem checking utility
   @echo   balancer             run a cluster balancing utility
   @echo   jmxget               get JMX exported values from NameNode or DataNode.
@@ -164,7 +189,10 @@ goto :eof
   @echo   fetchdt              fetch a delegation token from the NameNode
   @echo   getconf              get config values from configuration
   @echo   groups               get the groups which users belong to
-  @echo                        Use -help to see options
+  @echo   snapshotDiff         diff two snapshots of a directory or diff the
+  @echo                        current directory contents with a snapshot
+  @echo   lsSnapshottableDir   list all snapshottable dirs owned by the current user
+  @echo 						Use -help to see options
   @echo.
   @echo Most commands print help when invoked w/o parameters.
 

Propchange: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs.cmd
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/start-dfs.cmd
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/stop-dfs.cmd
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/
------------------------------------------------------------------------------
  Merged /hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java:r1519784-1537326
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java:r1523400,1523875,1525828,1531125,1532468,1532857,1532932,1533917,1534426

Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java Wed Oct 30 22:21:59 2013
@@ -20,12 +20,16 @@ package org.apache.hadoop.hdfs;
 import java.io.IOException;
 
 import org.apache.hadoop.fs.ByteBufferReadable;
+import org.apache.hadoop.hdfs.client.ClientMmap;
+import org.apache.hadoop.hdfs.client.ClientMmapManager;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 
 /**
  * A BlockReader is responsible for reading a single block
  * from a single datanode.
  */
 public interface BlockReader extends ByteBufferReadable {
+  
 
   /* same interface as inputStream java.io.InputStream#read()
    * used by DFSInputStream#read()
@@ -81,4 +85,14 @@ public interface BlockReader extends Byt
    *                      All short-circuit reads are also local.
    */
   boolean isShortCircuit();
+
+  /**
+   * Get a ClientMmap object for this BlockReader.
+   *
+   * @param curBlock      The current block.
+   * @return              The ClientMmap object, or null if mmap is not
+   *                      supported.
+   */
+  ClientMmap getClientMmap(LocatedBlock curBlock,
+        ClientMmapManager mmapManager);
 }

Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java Wed Oct 30 22:21:59 2013
@@ -22,11 +22,15 @@ import java.io.DataInputStream;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import org.apache.hadoop.conf.Configuration;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hdfs.client.ClientMmap;
+import org.apache.hadoop.hdfs.client.ClientMmapManager;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
 import org.apache.hadoop.hdfs.util.DirectBufferPool;
 import org.apache.hadoop.io.IOUtils;
@@ -87,6 +91,8 @@ class BlockReaderLocal implements BlockR
   private final ExtendedBlock block;
   
   private final FileInputStreamCache fisCache;
+  private ClientMmap clientMmap;
+  private boolean mmapDisabled;
   
   private static int getSlowReadBufferNumChunks(int bufSize,
       int bytesPerChecksum) {
@@ -113,6 +119,8 @@ class BlockReaderLocal implements BlockR
     this.datanodeID = datanodeID;
     this.block = block;
     this.fisCache = fisCache;
+    this.clientMmap = null;
+    this.mmapDisabled = false;
 
     // read and handle the common header here. For now just a version
     checksumIn.getChannel().position(0);
@@ -487,6 +495,10 @@ class BlockReaderLocal implements BlockR
 
   @Override
   public synchronized void close() throws IOException {
+    if (clientMmap != null) {
+      clientMmap.unref();
+      clientMmap = null;
+    }
     if (fisCache != null) {
       if (LOG.isDebugEnabled()) {
         LOG.debug("putting FileInputStream for " + filename +
@@ -534,4 +546,30 @@ class BlockReaderLocal implements BlockR
   public boolean isShortCircuit() {
     return true;
   }
+
+  @Override
+  public ClientMmap getClientMmap(LocatedBlock curBlock,
+      ClientMmapManager mmapManager) {
+    if (clientMmap == null) {
+      if (mmapDisabled) {
+        return null;
+      }
+      try {
+        clientMmap = mmapManager.fetch(datanodeID, block, dataIn);
+        if (clientMmap == null) {
+          mmapDisabled = true;
+          return null;
+        }
+      } catch (InterruptedException e) {
+        LOG.error("Interrupted while setting up mmap for " + filename, e);
+        Thread.currentThread().interrupt();
+        return null;
+      } catch (IOException e) {
+        LOG.error("unable to set up mmap for " + filename, e);
+        mmapDisabled = true;
+        return null;
+      }
+    }
+    return clientMmap;
+  }
 }

Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java Wed Oct 30 22:21:59 2013
@@ -28,6 +28,8 @@ import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.Map;
 
+import org.apache.hadoop.hdfs.client.ClientMmap;
+import org.apache.hadoop.hdfs.client.ClientMmapManager;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -35,6 +37,7 @@ import org.apache.hadoop.hdfs.protocol.B
 import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
 import org.apache.hadoop.hdfs.util.DirectBufferPool;
@@ -701,4 +704,10 @@ class BlockReaderLocalLegacy implements 
   public boolean isShortCircuit() {
     return true;
   }
+
+  @Override
+  public ClientMmap getClientMmap(LocatedBlock curBlock,
+      ClientMmapManager mmapManager) {
+    return null;
+  }
 }

Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java Wed Oct 30 22:21:59 2013
@@ -27,6 +27,9 @@ import static org.apache.hadoop.hdfs.DFS
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_RETRIES_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHE_DROP_BEHIND_READS;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHE_DROP_BEHIND_WRITES;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHE_READAHEAD;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_DEFAULT;
@@ -44,9 +47,6 @@ import static org.apache.hadoop.hdfs.DFS
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHE_DROP_BEHIND_READS;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHE_DROP_BEHIND_WRITES;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHE_READAHEAD;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
@@ -102,9 +102,11 @@ import org.apache.hadoop.fs.MD5MD5CRC32G
 import org.apache.hadoop.fs.Options;
 import org.apache.hadoop.fs.Options.ChecksumOpt;
 import org.apache.hadoop.fs.ParentNotDirectoryException;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.UnresolvedLinkException;
 import org.apache.hadoop.fs.VolumeId;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.client.ClientMmapManager;
 import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
 import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
@@ -115,13 +117,13 @@ import org.apache.hadoop.hdfs.protocol.D
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
-import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
+import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
 import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
 import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
@@ -146,6 +148,7 @@ import org.apache.hadoop.io.EnumSetWrita
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.MD5Hash;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.retry.LossyRetryInvocationHandler;
 import org.apache.hadoop.ipc.Client;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
@@ -206,7 +209,43 @@ public class DFSClient implements java.i
   private boolean shouldUseLegacyBlockReaderLocal;
   private final CachingStrategy defaultReadCachingStrategy;
   private final CachingStrategy defaultWriteCachingStrategy;
+  private ClientMmapManager mmapManager;
   
+  private static final ClientMmapManagerFactory MMAP_MANAGER_FACTORY =
+      new ClientMmapManagerFactory();
+
+  private static final class ClientMmapManagerFactory {
+    private ClientMmapManager mmapManager = null;
+    /**
+     * Tracks the number of users of mmapManager.
+     */
+    private int refcnt = 0;
+
+    synchronized ClientMmapManager get(Configuration conf) {
+      if (refcnt++ == 0) {
+        mmapManager = ClientMmapManager.fromConf(conf);
+      } else {
+        String mismatches = mmapManager.verifyConfigurationMatches(conf);
+        if (!mismatches.isEmpty()) {
+          LOG.warn("The ClientMmapManager settings you specified " +
+            "have been ignored because another thread created the " +
+            "ClientMmapManager first.  " + mismatches);
+        }
+      }
+      return mmapManager;
+    }
+    
+    synchronized void unref(ClientMmapManager mmapManager) {
+      if (this.mmapManager != mmapManager) {
+        throw new IllegalArgumentException();
+      }
+      if (--refcnt == 0) {
+        IOUtils.cleanup(LOG, mmapManager);
+        mmapManager = null;
+      }
+    }
+  }
+
   /**
    * DFSClient configuration 
    */
@@ -453,7 +492,11 @@ public class DFSClient implements java.i
   
   /** 
    * Create a new DFSClient connected to the given nameNodeUri or rpcNamenode.
-   * Exactly one of nameNodeUri or rpcNamenode must be null.
+   * If HA is enabled and a positive value is set for 
+   * {@link DFSConfigKeys#DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY} in the
+   * configuration, the DFSClient will use {@link LossyRetryInvocationHandler}
+   * as its RetryInvocationHandler. Otherwise one of nameNodeUri or rpcNamenode 
+   * must be null.
    */
   @VisibleForTesting
   public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode,
@@ -477,7 +520,23 @@ public class DFSClient implements java.i
     this.clientName = "DFSClient_" + dfsClientConf.taskId + "_" + 
         DFSUtil.getRandom().nextInt()  + "_" + Thread.currentThread().getId();
     
-    if (rpcNamenode != null) {
+    int numResponseToDrop = conf.getInt(
+        DFSConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY,
+        DFSConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_DEFAULT);
+    NameNodeProxies.ProxyAndInfo<ClientProtocol> proxyInfo = null;
+    if (numResponseToDrop > 0) {
+      // This case is used for testing.
+      LOG.warn(DFSConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY
+          + " is set to " + numResponseToDrop
+          + ", this hacked client will proactively drop responses");
+      proxyInfo = NameNodeProxies.createProxyWithLossyRetryHandler(conf,
+          nameNodeUri, ClientProtocol.class, numResponseToDrop);
+    }
+    
+    if (proxyInfo != null) {
+      this.dtService = proxyInfo.getDelegationTokenService();
+      this.namenode = proxyInfo.getProxy();
+    } else if (rpcNamenode != null) {
       // This case is used for testing.
       Preconditions.checkArgument(nameNodeUri == null);
       this.namenode = rpcNamenode;
@@ -485,9 +544,8 @@ public class DFSClient implements java.i
     } else {
       Preconditions.checkArgument(nameNodeUri != null,
           "null URI");
-      NameNodeProxies.ProxyAndInfo<ClientProtocol> proxyInfo =
-        NameNodeProxies.createProxy(conf, nameNodeUri, ClientProtocol.class);
-      
+      proxyInfo = NameNodeProxies.createProxy(conf, nameNodeUri,
+          ClientProtocol.class);
       this.dtService = proxyInfo.getDelegationTokenService();
       this.namenode = proxyInfo.getProxy();
     }
@@ -515,8 +573,9 @@ public class DFSClient implements java.i
         new CachingStrategy(readDropBehind, readahead);
     this.defaultWriteCachingStrategy =
         new CachingStrategy(writeDropBehind, readahead);
+    this.mmapManager = MMAP_MANAGER_FACTORY.get(conf);
   }
-
+  
   /**
    * Return the socket addresses to use with each configured
    * local interface. Local interfaces may be specified by IP
@@ -719,9 +778,12 @@ public class DFSClient implements java.i
   
   /** Abort and release resources held.  Ignore all errors. */
   void abort() {
+    if (mmapManager != null) {
+      MMAP_MANAGER_FACTORY.unref(mmapManager);
+      mmapManager = null;
+    }
     clientRunning = false;
     closeAllFilesBeingWritten(true);
-
     try {
       // remove reference to this client and stop the renewer,
       // if there is no more clients under the renewer.
@@ -765,6 +827,10 @@ public class DFSClient implements java.i
    */
   @Override
   public synchronized void close() throws IOException {
+    if (mmapManager != null) {
+      MMAP_MANAGER_FACTORY.unref(mmapManager);
+      mmapManager = null;
+    }
     if(clientRunning) {
       closeAllFilesBeingWritten(false);
       clientRunning = false;
@@ -825,10 +891,15 @@ public class DFSClient implements java.i
     assert dtService != null;
     Token<DelegationTokenIdentifier> token =
       namenode.getDelegationToken(renewer);
-    token.setService(this.dtService);
 
-    LOG.info("Created " + DelegationTokenIdentifier.stringifyToken(token));
+    if (token != null) {
+      token.setService(this.dtService);
+      LOG.info("Created " + DelegationTokenIdentifier.stringifyToken(token));
+    } else {
+      LOG.info("Cannot get delegation token from " + renewer);
+    }
     return token;
+
   }
 
   /**
@@ -2493,4 +2564,9 @@ public class DFSClient implements java.i
   public CachingStrategy getDefaultWriteCachingStrategy() {
     return defaultWriteCachingStrategy;
   }
+
+  @VisibleForTesting
+  public ClientMmapManager getMmapManager() {
+    return mmapManager;
+  }
 }

Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Wed Oct 30 22:21:59 2013
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault;
 
 /** 
  * This class contains constants for configuration keys used
@@ -192,7 +193,10 @@ public class DFSConfigKeys extends Commo
   public static final boolean DFS_DATANODE_SYNCONCLOSE_DEFAULT = false;
   public static final String  DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_KEY = "dfs.datanode.socket.reuse.keepalive";
   public static final int     DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_DEFAULT = 1000;
-  
+
+  public static final String DFS_NAMENODE_DATANODE_REGISTRATION_IP_HOSTNAME_CHECK_KEY = "dfs.namenode.datanode.registration.ip-hostname-check";
+  public static final boolean DFS_NAMENODE_DATANODE_REGISTRATION_IP_HOSTNAME_CHECK_DEFAULT = true;
+
   // Whether to enable datanode's stale state detection and usage for reads
   public static final String DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY = "dfs.namenode.avoid.read.stale.datanode";
   public static final boolean DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_DEFAULT = false;
@@ -267,6 +271,8 @@ public class DFSConfigKeys extends Commo
   public static final String  DFS_CLIENT_LOCAL_INTERFACES = "dfs.client.local.interfaces";
   public static final String  DFS_NAMENODE_AUDIT_LOGGERS_KEY = "dfs.namenode.audit.loggers";
   public static final String  DFS_NAMENODE_DEFAULT_AUDIT_LOGGER_NAME = "default";
+  public static final String  DFS_NAMENODE_AUDIT_LOG_TOKEN_TRACKING_ID_KEY = "dfs.namenode.audit.log.token.tracking.id";
+  public static final boolean DFS_NAMENODE_AUDIT_LOG_TOKEN_TRACKING_ID_DEFAULT = false;
 
   // Much code in hdfs is not yet updated to use these keys.
   public static final String  DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY = "dfs.client.block.write.locateFollowingBlock.retries";
@@ -346,6 +352,8 @@ public class DFSConfigKeys extends Commo
   public static final String  DFS_BLOCK_ACCESS_TOKEN_LIFETIME_KEY = "dfs.block.access.token.lifetime";
   public static final long    DFS_BLOCK_ACCESS_TOKEN_LIFETIME_DEFAULT = 600L;
 
+  public static final String DFS_BLOCK_REPLICATOR_CLASSNAME_KEY = "dfs.block.replicator.classname";
+  public static final Class<BlockPlacementPolicyDefault> DFS_BLOCK_REPLICATOR_CLASSNAME_DEFAULT = BlockPlacementPolicyDefault.class;
   public static final String  DFS_REPLICATION_MAX_KEY = "dfs.replication.max";
   public static final int     DFS_REPLICATION_MAX_DEFAULT = 512;
   public static final String  DFS_DF_INTERVAL_KEY = "dfs.df.interval";
@@ -371,6 +379,12 @@ public class DFSConfigKeys extends Commo
   public static final int DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_DEFAULT = 1024 * 1024;
   public static final String DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC = "dfs.client.domain.socket.data.traffic";
   public static final boolean DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT = false;
+  public static final String DFS_CLIENT_MMAP_CACHE_SIZE = "dfs.client.mmap.cache.size";
+  public static final int DFS_CLIENT_MMAP_CACHE_SIZE_DEFAULT = 1024;
+  public static final String DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS = "dfs.client.mmap.cache.timeout.ms";
+  public static final long DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS_DEFAULT  = 15 * 60 * 1000;
+  public static final String DFS_CLIENT_MMAP_CACHE_THREAD_RUNS_PER_TIMEOUT = "dfs.client.mmap.cache.thread.runs.per.timeout";
+  public static final int DFS_CLIENT_MMAP_CACHE_THREAD_RUNS_PER_TIMEOUT_DEFAULT  = 4;
 
   // property for fsimage compression
   public static final String DFS_IMAGE_COMPRESS_KEY = "dfs.image.compress";
@@ -497,9 +511,27 @@ public class DFSConfigKeys extends Commo
   public static final long DFS_NAMENODE_RETRY_CACHE_EXPIRYTIME_MILLIS_DEFAULT = 600000; // 10 minutes
   public static final String DFS_NAMENODE_RETRY_CACHE_HEAP_PERCENT_KEY = "dfs.namenode.retrycache.heap.percent";
   public static final float DFS_NAMENODE_RETRY_CACHE_HEAP_PERCENT_DEFAULT = 0.03f;
+  
+  // The number of NN response dropped by client proactively in each RPC call.
+  // For testing NN retry cache, we can set this property with positive value.
+  public static final String DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY = "dfs.client.test.drop.namenode.response.number";
+  public static final int DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_DEFAULT = 0;
+
 
   // Hidden configuration undocumented in hdfs-site. xml
   // Timeout to wait for block receiver and responder thread to stop
   public static final String DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_KEY = "dfs.datanode.xceiver.stop.timeout.millis";
   public static final long   DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_DEFAULT = 60000;
+
+  // WebHDFS retry policy
+  public static final String  DFS_HTTP_CLIENT_RETRY_POLICY_ENABLED_KEY = "dfs.http.client.retry.policy.enabled";
+  public static final boolean DFS_HTTP_CLIENT_RETRY_POLICY_ENABLED_DEFAULT = false;
+  public static final String  DFS_HTTP_CLIENT_RETRY_POLICY_SPEC_KEY = "dfs.http.client.retry.policy.spec";
+  public static final String  DFS_HTTP_CLIENT_RETRY_POLICY_SPEC_DEFAULT = "10000,6,60000,10"; //t1,n1,t2,n2,...
+  public static final String  DFS_HTTP_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY = "dfs.http.client.failover.max.attempts";
+  public static final int     DFS_HTTP_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT = 15;
+  public static final String  DFS_HTTP_CLIENT_FAILOVER_SLEEPTIME_BASE_KEY = "dfs.http.client.failover.sleep.base.millis";
+  public static final int     DFS_HTTP_CLIENT_FAILOVER_SLEEPTIME_BASE_DEFAULT = 500;
+  public static final String  DFS_HTTP_CLIENT_FAILOVER_SLEEPTIME_MAX_KEY = "dfs.http.client.failover.sleep.max.millis";
+  public static final int     DFS_HTTP_CLIENT_FAILOVER_SLEEPTIME_MAX_DEFAULT = 15000;
 }

Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java Wed Oct 30 22:21:59 2013
@@ -24,6 +24,7 @@ import java.net.Socket;
 import java.nio.ByteBuffer;
 import java.util.AbstractMap;
 import java.util.ArrayList;
+import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -36,11 +37,15 @@ import java.util.concurrent.ConcurrentHa
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.ByteBufferReadable;
+import org.apache.hadoop.fs.ByteBufferUtil;
 import org.apache.hadoop.fs.CanSetDropBehind;
 import org.apache.hadoop.fs.CanSetReadahead;
 import org.apache.hadoop.fs.ChecksumException;
 import org.apache.hadoop.fs.FSInputStream;
+import org.apache.hadoop.fs.HasEnhancedByteBufferAccess;
+import org.apache.hadoop.fs.ReadOption;
 import org.apache.hadoop.fs.UnresolvedLinkException;
+import org.apache.hadoop.hdfs.client.ClientMmap;
 import org.apache.hadoop.hdfs.net.DomainPeer;
 import org.apache.hadoop.hdfs.net.Peer;
 import org.apache.hadoop.hdfs.net.TcpPeerServer;
@@ -54,12 +59,14 @@ import org.apache.hadoop.hdfs.security.t
 import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
 import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
+import org.apache.hadoop.io.ByteBufferPool;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.unix.DomainSocket;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.IdentityHashStore;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -69,7 +76,8 @@ import com.google.common.annotations.Vis
  ****************************************************************/
 @InterfaceAudience.Private
 public class DFSInputStream extends FSInputStream
-implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead {
+implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
+    HasEnhancedByteBufferAccess {
   @VisibleForTesting
   static boolean tcpReadsDisabledForTesting = false;
   private final PeerCache peerCache;
@@ -87,17 +95,28 @@ implements ByteBufferReadable, CanSetDro
   private CachingStrategy cachingStrategy;
   private final ReadStatistics readStatistics = new ReadStatistics();
 
+  /**
+   * Track the ByteBuffers that we have handed out to readers.
+   * 
+   * The value type can be either ByteBufferPool or ClientMmap, depending on
+   * whether we this is a memory-mapped buffer or not.
+   */
+  private final IdentityHashStore<ByteBuffer, Object>
+      extendedReadBuffers = new IdentityHashStore<ByteBuffer, Object>(0);
+
   public static class ReadStatistics {
     public ReadStatistics() {
       this.totalBytesRead = 0;
       this.totalLocalBytesRead = 0;
       this.totalShortCircuitBytesRead = 0;
+      this.totalZeroCopyBytesRead = 0;
     }
 
     public ReadStatistics(ReadStatistics rhs) {
       this.totalBytesRead = rhs.getTotalBytesRead();
       this.totalLocalBytesRead = rhs.getTotalLocalBytesRead();
       this.totalShortCircuitBytesRead = rhs.getTotalShortCircuitBytesRead();
+      this.totalZeroCopyBytesRead = rhs.getTotalZeroCopyBytesRead();
     }
 
     /**
@@ -123,6 +142,13 @@ implements ByteBufferReadable, CanSetDro
     public long getTotalShortCircuitBytesRead() {
       return totalShortCircuitBytesRead;
     }
+    
+    /**
+     * @return The total number of zero-copy bytes read.
+     */
+    public long getTotalZeroCopyBytesRead() {
+      return totalZeroCopyBytesRead;
+    }
 
     /**
      * @return The total number of bytes read which were not local.
@@ -145,12 +171,21 @@ implements ByteBufferReadable, CanSetDro
       this.totalLocalBytesRead += amt;
       this.totalShortCircuitBytesRead += amt;
     }
+
+    void addZeroCopyBytes(long amt) {
+      this.totalBytesRead += amt;
+      this.totalLocalBytesRead += amt;
+      this.totalShortCircuitBytesRead += amt;
+      this.totalZeroCopyBytesRead += amt;
+    }
     
     private long totalBytesRead;
 
     private long totalLocalBytesRead;
 
     private long totalShortCircuitBytesRead;
+
+    private long totalZeroCopyBytesRead;
   }
   
   private final FileInputStreamCache fileInputStreamCache;
@@ -368,7 +403,7 @@ implements ByteBufferReadable, CanSetDro
 
     //check offset
     if (offset < 0 || offset >= getFileLength()) {
-      throw new IOException("offset < 0 || offset > getFileLength(), offset="
+      throw new IOException("offset < 0 || offset >= getFileLength(), offset="
           + offset
           + ", updatePosition=" + updatePosition
           + ", locatedBlocks=" + locatedBlocks);
@@ -587,6 +622,20 @@ implements ByteBufferReadable, CanSetDro
     }
     dfsClient.checkOpen();
 
+    if (!extendedReadBuffers.isEmpty()) {
+      final StringBuilder builder = new StringBuilder();
+      extendedReadBuffers.visitAll(new IdentityHashStore.Visitor<ByteBuffer, Object>() {
+        private String prefix = "";
+        @Override
+        public void accept(ByteBuffer k, Object v) {
+          builder.append(prefix).append(k);
+          prefix = ", ";
+        }
+      });
+      DFSClient.LOG.warn("closing file " + src + ", but there are still " +
+          "unreleased ByteBuffers allocated by read().  " +
+          "Please release " + builder.toString() + ".");
+    }
     if (blockReader != null) {
       blockReader.close();
       blockReader = null;
@@ -1393,4 +1442,100 @@ implements ByteBufferReadable, CanSetDro
     this.cachingStrategy.setDropBehind(dropBehind);
     closeCurrentBlockReader();
   }
+
+  @Override
+  public synchronized ByteBuffer read(ByteBufferPool bufferPool,
+      int maxLength, EnumSet<ReadOption> opts) 
+          throws IOException, UnsupportedOperationException {
+    assert(maxLength > 0);
+    if (((blockReader == null) || (blockEnd == -1)) &&
+          (pos < getFileLength())) {
+      /*
+       * If we don't have a blockReader, or the one we have has no more bytes
+       * left to read, we call seekToBlockSource to get a new blockReader and
+       * recalculate blockEnd.  Note that we assume we're not at EOF here
+       * (we check this above).
+       */
+      if ((!seekToBlockSource(pos)) || (blockReader == null)) {
+        throw new IOException("failed to allocate new BlockReader " +
+            "at position " + pos);
+      }
+    }
+    boolean canSkipChecksums = opts.contains(ReadOption.SKIP_CHECKSUMS);
+    if (canSkipChecksums) {
+      ByteBuffer buffer = tryReadZeroCopy(maxLength);
+      if (buffer != null) {
+        return buffer;
+      }
+    }
+    ByteBuffer buffer = ByteBufferUtil.
+        fallbackRead(this, bufferPool, maxLength);
+    if (buffer != null) {
+      extendedReadBuffers.put(buffer, bufferPool);
+    }
+    return buffer;
+  }
+
+  private synchronized ByteBuffer tryReadZeroCopy(int maxLength)
+      throws IOException {
+    // Java ByteBuffers can't be longer than 2 GB, because they use
+    // 4-byte signed integers to represent capacity, etc.
+    // So we can't mmap the parts of the block higher than the 2 GB offset.
+    // FIXME: we could work around this with multiple memory maps.
+    // See HDFS-5101.
+    long blockEnd32 = Math.min(Integer.MAX_VALUE, blockEnd);
+    long curPos = pos;
+    long blockLeft = blockEnd32 - curPos + 1;
+    if (blockLeft <= 0) {
+      if (DFSClient.LOG.isDebugEnabled()) {
+        DFSClient.LOG.debug("unable to perform a zero-copy read from offset " +
+          curPos + " of " + src + "; blockLeft = " + blockLeft +
+          "; blockEnd32 = " + blockEnd32 + ", blockEnd = " + blockEnd +
+          "; maxLength = " + maxLength);
+      }
+      return null;
+    }
+    int length = Math.min((int)blockLeft, maxLength);
+    long blockStartInFile = currentLocatedBlock.getStartOffset();
+    long blockPos = curPos - blockStartInFile;
+    long limit = blockPos + length;
+    ClientMmap clientMmap =
+        blockReader.getClientMmap(currentLocatedBlock,
+            dfsClient.getMmapManager());
+    if (clientMmap == null) {
+      if (DFSClient.LOG.isDebugEnabled()) {
+        DFSClient.LOG.debug("unable to perform a zero-copy read from offset " +
+          curPos + " of " + src + "; BlockReader#getClientMmap returned " +
+          "null.");
+      }
+      return null;
+    }
+    seek(pos + length);
+    ByteBuffer buffer = clientMmap.getMappedByteBuffer().asReadOnlyBuffer();
+    buffer.position((int)blockPos);
+    buffer.limit((int)limit);
+    clientMmap.ref();
+    extendedReadBuffers.put(buffer, clientMmap);
+    readStatistics.addZeroCopyBytes(length);
+    if (DFSClient.LOG.isDebugEnabled()) {
+      DFSClient.LOG.debug("readZeroCopy read " + maxLength + " bytes from " +
+          "offset " + curPos + " via the zero-copy read path.  " +
+          "blockEnd = " + blockEnd);
+    }
+    return buffer;
+  }
+
+  @Override
+  public synchronized void releaseBuffer(ByteBuffer buffer) {
+    Object val = extendedReadBuffers.remove(buffer);
+    if (val == null) {
+      throw new IllegalArgumentException("tried to release a buffer " +
+          "that was not created by this stream, " + buffer);
+    }
+    if (val instanceof ClientMmap) {
+      ((ClientMmap)val).unref();
+    } else if (val instanceof ByteBufferPool) {
+      ((ByteBufferPool)val).putBuffer(buffer);
+    }
+  }
 }

Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java Wed Oct 30 22:21:59 2013
@@ -38,6 +38,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.CanSetDropBehind;
@@ -46,7 +47,6 @@ import org.apache.hadoop.fs.FSOutputSumm
 import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.fs.ParentNotDirectoryException;
 import org.apache.hadoop.fs.Syncable;
-import org.apache.hadoop.fs.UnresolvedLinkException;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
 import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag;
@@ -85,7 +85,6 @@ import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.Time;
-import org.mortbay.log.Log;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.cache.CacheBuilder;
@@ -138,10 +137,10 @@ public class DFSOutputStream extends FSO
   private long currentSeqno = 0;
   private long lastQueuedSeqno = -1;
   private long lastAckedSeqno = -1;
-  private long bytesCurBlock = 0; // bytes writen in current block
+  private long bytesCurBlock = 0; // bytes written in current block
   private int packetSize = 0; // write packet size, not including the header.
   private int chunksPerPacket = 0;
-  private volatile IOException lastException = null;
+  private final AtomicReference<IOException> lastException = new AtomicReference<IOException>();
   private long artificialSlowdown = 0;
   private long lastFlushOffset = 0; // offset when flush was invoked
   //persist blocks on namenode
@@ -466,8 +465,7 @@ public class DFSOutputStream extends FSO
           }
         }
 
-        Packet one = null;
-
+        Packet one;
         try {
           // process datanode IO errors if any
           boolean doSleep = false;
@@ -511,7 +509,7 @@ public class DFSOutputStream extends FSO
             if(DFSClient.LOG.isDebugEnabled()) {
               DFSClient.LOG.debug("Allocating new block");
             }
-            nodes = nextBlockOutputStream(src);
+            nodes = nextBlockOutputStream();
             initDataStreaming();
           } else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) {
             if(DFSClient.LOG.isDebugEnabled()) {
@@ -575,9 +573,6 @@ public class DFSOutputStream extends FSO
           }
           lastPacket = Time.now();
           
-          if (one.isHeartbeatPacket()) {  //heartbeat packet
-          }
-          
           // update bytesSent
           long tmpBytesSent = one.getLastByteOffsetBlock();
           if (bytesSent < tmpBytesSent) {
@@ -695,7 +690,7 @@ public class DFSOutputStream extends FSO
     }
 
     //
-    // Processes reponses from the datanodes.  A packet is removed 
+    // Processes responses from the datanodes.  A packet is removed
     // from the ackQueue when its response arrives.
     //
     private class ResponseProcessor extends Daemon {
@@ -737,18 +732,18 @@ public class DFSOutputStream extends FSO
             }
             
             assert seqno != PipelineAck.UNKOWN_SEQNO : 
-              "Ack for unkown seqno should be a failed ack: " + ack;
+              "Ack for unknown seqno should be a failed ack: " + ack;
             if (seqno == Packet.HEART_BEAT_SEQNO) {  // a heartbeat ack
               continue;
             }
 
             // a success ack for a data packet
-            Packet one = null;
+            Packet one;
             synchronized (dataQueue) {
               one = ackQueue.getFirst();
             }
             if (one.seqno != seqno) {
-              throw new IOException("Responseprocessor: Expecting seqno " +
+              throw new IOException("ResponseProcessor: Expecting seqno " +
                                     " for block " + block +
                                     one.seqno + " but received " + seqno);
             }
@@ -814,8 +809,8 @@ public class DFSOutputStream extends FSO
         if (++pipelineRecoveryCount > 5) {
           DFSClient.LOG.warn("Error recovering pipeline for writing " +
               block + ". Already retried 5 times for the same packet.");
-          lastException = new IOException("Failing write. Tried pipeline " +
-              "recovery 5 times without success.");
+          lastException.set(new IOException("Failing write. Tried pipeline " +
+              "recovery 5 times without success."));
           streamerClosed = true;
           return false;
         }
@@ -1005,8 +1000,8 @@ public class DFSOutputStream extends FSO
             }
           }
           if (nodes.length <= 1) {
-            lastException = new IOException("All datanodes " + pipelineMsg
-                + " are bad. Aborting...");
+            lastException.set(new IOException("All datanodes " + pipelineMsg
+                + " are bad. Aborting..."));
             streamerClosed = true;
             return false;
           }
@@ -1021,7 +1016,7 @@ public class DFSOutputStream extends FSO
               newnodes.length-errorIndex);
           nodes = newnodes;
           hasError = false;
-          lastException = null;
+          lastException.set(null);
           errorIndex = -1;
         }
 
@@ -1057,7 +1052,7 @@ public class DFSOutputStream extends FSO
      * Must get block ID and the IDs of the destinations from the namenode.
      * Returns the list of target datanodes.
      */
-    private DatanodeInfo[] nextBlockOutputStream(String client) throws IOException {
+    private DatanodeInfo[] nextBlockOutputStream() throws IOException {
       LocatedBlock lb = null;
       DatanodeInfo[] nodes = null;
       int count = dfsClient.getConf().nBlockWriteRetry;
@@ -1065,7 +1060,7 @@ public class DFSOutputStream extends FSO
       ExtendedBlock oldBlock = block;
       do {
         hasError = false;
-        lastException = null;
+        lastException.set(null);
         errorIndex = -1;
         success = false;
 
@@ -1107,6 +1102,11 @@ public class DFSOutputStream extends FSO
     //
     private boolean createBlockOutputStream(DatanodeInfo[] nodes, long newGS,
         boolean recoveryFlag) {
+      if (nodes.length == 0) {
+        DFSClient.LOG.info("nodes are empty for write pipeline of block "
+            + block);
+        return false;
+      }
       Status pipelineStatus = SUCCESS;
       String firstBadLink = "";
       if (DFSClient.LOG.isDebugEnabled()) {
@@ -1215,8 +1215,7 @@ public class DFSOutputStream extends FSO
     }
 
     private LocatedBlock locateFollowingBlock(long start,
-        DatanodeInfo[] excludedNodes) 
-        throws IOException, UnresolvedLinkException {
+        DatanodeInfo[] excludedNodes)  throws IOException {
       int retries = dfsClient.getConf().nBlockWriteLocateFollowingRetry;
       long sleeptime = 400;
       while (true) {
@@ -1279,9 +1278,7 @@ public class DFSOutputStream extends FSO
     }
 
     private void setLastException(IOException e) {
-      if (lastException == null) {
-        lastException = e;
-      }
+      lastException.compareAndSet(null, e);
     }
   }
 
@@ -1289,7 +1286,7 @@ public class DFSOutputStream extends FSO
    * Create a socket for a write pipeline
    * @param first the first datanode 
    * @param length the pipeline length
-   * @param client
+   * @param client client
    * @return the socket connected to the first datanode
    */
   static Socket createSocketForPipeline(final DatanodeInfo first,
@@ -1313,7 +1310,7 @@ public class DFSOutputStream extends FSO
 
   protected void checkClosed() throws IOException {
     if (closed) {
-      IOException e = lastException;
+      IOException e = lastException.get();
       throw e != null ? e : new ClosedChannelException();
     }
   }
@@ -1469,6 +1466,7 @@ public class DFSOutputStream extends FSO
 
   private void waitAndQueueCurrentPacket() throws IOException {
     synchronized (dataQueue) {
+      try {
       // If queue is full, then wait till we have enough space
       while (!closed && dataQueue.size() + ackQueue.size()  > MAX_PACKETS) {
         try {
@@ -1480,13 +1478,15 @@ public class DFSOutputStream extends FSO
           //
           // Rather than wait around for space in the queue, we should instead try to
           // return to the caller as soon as possible, even though we slightly overrun
-          // the MAX_PACKETS iength.
+          // the MAX_PACKETS length.
           Thread.currentThread().interrupt();
           break;
         }
       }
       checkClosed();
       queueCurrentPacket();
+      } catch (ClosedChannelException e) {
+      }
     }
   }
 
@@ -1704,7 +1704,7 @@ public class DFSOutputStream extends FSO
         }
       }
       // If 1) any new blocks were allocated since the last flush, or 2) to
-      // update length in NN is requried, then persist block locations on
+      // update length in NN is required, then persist block locations on
       // namenode.
       if (persistBlocks.getAndSet(false) || updateLength) {
         try {
@@ -1735,7 +1735,7 @@ public class DFSOutputStream extends FSO
       DFSClient.LOG.warn("Error while syncing", e);
       synchronized (this) {
         if (!closed) {
-          lastException = new IOException("IOException flush:" + e);
+          lastException.set(new IOException("IOException flush:" + e));
           closeThreads(true);
         }
       }
@@ -1793,21 +1793,25 @@ public class DFSOutputStream extends FSO
     if (DFSClient.LOG.isDebugEnabled()) {
       DFSClient.LOG.debug("Waiting for ack for: " + seqno);
     }
-    synchronized (dataQueue) {
-      while (!closed) {
-        checkClosed();
-        if (lastAckedSeqno >= seqno) {
-          break;
-        }
-        try {
-          dataQueue.wait(1000); // when we receive an ack, we notify on dataQueue
-        } catch (InterruptedException ie) {
-          throw new InterruptedIOException(
-            "Interrupted while waiting for data to be acknowledged by pipeline");
+    try {
+      synchronized (dataQueue) {
+        while (!closed) {
+          checkClosed();
+          if (lastAckedSeqno >= seqno) {
+            break;
+          }
+          try {
+            dataQueue.wait(1000); // when we receive an ack, we notify on
+                                  // dataQueue
+          } catch (InterruptedException ie) {
+            throw new InterruptedIOException(
+                "Interrupted while waiting for data to be acknowledged by pipeline");
+          }
         }
       }
+      checkClosed();
+    } catch (ClosedChannelException e) {
     }
-    checkClosed();
   }
 
   private synchronized void start() {
@@ -1853,7 +1857,7 @@ public class DFSOutputStream extends FSO
   @Override
   public synchronized void close() throws IOException {
     if (closed) {
-      IOException e = lastException;
+      IOException e = lastException.getAndSet(null);
       if (e == null)
         return;
       else
@@ -1880,6 +1884,7 @@ public class DFSOutputStream extends FSO
       closeThreads(false);
       completeFile(lastBlock);
       dfsClient.endFileLease(src);
+    } catch (ClosedChannelException e) {
     } finally {
       closed = true;
     }

Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java Wed Oct 30 22:21:59 2013
@@ -38,6 +38,7 @@ import java.net.InetSocketAddress;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.security.SecureRandom;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
@@ -599,6 +600,48 @@ public class DFSUtil {
       Configuration conf) {
     return getAddresses(conf, null, DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY);
   }
+
+  /**
+   * Returns list of InetSocketAddress corresponding to HA NN HTTP addresses from
+   * the configuration.
+   *
+   * @param conf configuration
+   * @return list of InetSocketAddresses
+   */
+  public static Map<String, Map<String, InetSocketAddress>> getHaNnHttpAddresses(
+      Configuration conf) {
+    return getAddresses(conf, null, DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY);
+  }
+
+  /**
+   * Resolve an HDFS URL into real INetSocketAddress. It works like a DNS resolver
+   * when the URL points to an non-HA cluster. When the URL points to an HA
+   * cluster, the resolver further resolves the logical name (i.e., the authority
+   * in the URL) into real namenode addresses.
+   */
+  public static InetSocketAddress[] resolve(URI uri, int schemeDefaultPort,
+      Configuration conf) throws IOException {
+    ArrayList<InetSocketAddress> ret = new ArrayList<InetSocketAddress>();
+
+    if (!HAUtil.isLogicalUri(conf, uri)) {
+      InetSocketAddress addr = NetUtils.createSocketAddr(uri.getAuthority(),
+          schemeDefaultPort);
+      ret.add(addr);
+
+    } else {
+      Map<String, Map<String, InetSocketAddress>> addresses = DFSUtil
+          .getHaNnHttpAddresses(conf);
+
+      for (Map<String, InetSocketAddress> addrs : addresses.values()) {
+        for (InetSocketAddress addr : addrs.values()) {
+          ret.add(addr);
+        }
+      }
+    }
+
+    InetSocketAddress[] r = new InetSocketAddress[ret.size()];
+    return ret.toArray(r);
+  }
   
   /**
    * Returns list of InetSocketAddress corresponding to  backup node rpc 

Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java Wed Oct 30 22:21:59 2013
@@ -713,6 +713,7 @@ public class DistributedFileSystem exten
   protected RemoteIterator<LocatedFileStatus> listLocatedStatus(final Path p,
       final PathFilter filter)
   throws IOException {
+    final Path absF = fixRelativePart(p);
     return new RemoteIterator<LocatedFileStatus>() {
       private DirectoryListing thisListing;
       private int i;
@@ -722,7 +723,7 @@ public class DistributedFileSystem exten
       { // initializer
         // Fully resolve symlinks in path first to avoid additional resolution
         // round-trips as we fetch more batches of listings
-        src = getPathName(resolvePath(p));
+        src = getPathName(resolvePath(absF));
         // fetch the first batch of entries in the directory
         thisListing = dfs.listPaths(src, HdfsFileStatus.EMPTY_NAME, true);
         statistics.incrementReadOps(1);
@@ -736,7 +737,7 @@ public class DistributedFileSystem exten
         while (curStat == null && hasNextNoFilter()) {
           LocatedFileStatus next = 
               ((HdfsLocatedFileStatus)thisListing.getPartialListing()[i++])
-              .makeQualifiedLocated(getUri(), p);
+              .makeQualifiedLocated(getUri(), absF);
           if (filter.accept(next.getPath())) {
             curStat = next;
           }

Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java Wed Oct 30 22:21:59 2013
@@ -17,15 +17,9 @@
  */
 package org.apache.hadoop.hdfs;
 
-import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Map;
-
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.HadoopIllegalArgumentException;
@@ -41,11 +35,17 @@ import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
-import static org.apache.hadoop.hdfs.protocol.HdfsConstants.HA_DT_SERVICE_PREFIX;
 
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Map;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
+import static org.apache.hadoop.hdfs.protocol.HdfsConstants.HA_DT_SERVICE_PREFIX;
 
 public class HAUtil {
   
@@ -265,10 +265,15 @@ public class HAUtil {
         tokenSelector.selectToken(haService, ugi.getTokens());
     if (haToken != null) {
       for (InetSocketAddress singleNNAddr : nnAddrs) {
+        // this is a minor hack to prevent physical HA tokens from being
+        // exposed to the user via UGI.getCredentials(), otherwise these
+        // cloned tokens may be inadvertently propagated to jobs
         Token<DelegationTokenIdentifier> specificToken =
-            new Token<DelegationTokenIdentifier>(haToken);
+            new Token.PrivateToken<DelegationTokenIdentifier>(haToken);
         SecurityUtil.setTokenService(specificToken, singleNNAddr);
-        ugi.addToken(specificToken);
+        Text alias =
+            new Text(HA_DT_SERVICE_PREFIX + "//" + specificToken.getService());
+        ugi.addToken(alias, specificToken);
         LOG.debug("Mapped HA service delegation token for logical URI " +
             haUri + " to namenode " + singleNNAddr);
       }

Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java Wed Oct 30 22:21:59 2013
@@ -17,10 +17,18 @@
  */
 package org.apache.hadoop.hdfs;
 
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_KEY;
 
 import java.io.IOException;
 import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Proxy;
 import java.net.InetSocketAddress;
 import java.net.URI;
 import java.util.HashMap;
@@ -48,6 +56,7 @@ import org.apache.hadoop.hdfs.server.pro
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.retry.DefaultFailoverProxyProvider;
 import org.apache.hadoop.io.retry.FailoverProxyProvider;
+import org.apache.hadoop.io.retry.LossyRetryInvocationHandler;
 import org.apache.hadoop.io.retry.RetryPolicies;
 import org.apache.hadoop.io.retry.RetryPolicy;
 import org.apache.hadoop.io.retry.RetryProxy;
@@ -144,6 +153,63 @@ public class NameNodeProxies {
       return new ProxyAndInfo<T>(proxy, dtService);
     }
   }
+  
+  /**
+   * Generate a dummy namenode proxy instance that utilizes our hacked
+   * {@link LossyRetryInvocationHandler}. Proxy instance generated using this
+   * method will proactively drop RPC responses. Currently this method only
+   * support HA setup. null will be returned if the given configuration is not 
+   * for HA.
+   * 
+   * @param config the configuration containing the required IPC
+   *        properties, client failover configurations, etc.
+   * @param nameNodeUri the URI pointing either to a specific NameNode
+   *        or to a logical nameservice.
+   * @param xface the IPC interface which should be created
+   * @param numResponseToDrop The number of responses to drop for each RPC call
+   * @return an object containing both the proxy and the associated
+   *         delegation token service it corresponds to. Will return null of the
+   *         given configuration does not support HA.
+   * @throws IOException if there is an error creating the proxy
+   */
+  @SuppressWarnings("unchecked")
+  public static <T> ProxyAndInfo<T> createProxyWithLossyRetryHandler(
+      Configuration config, URI nameNodeUri, Class<T> xface,
+      int numResponseToDrop) throws IOException {
+    Preconditions.checkArgument(numResponseToDrop > 0);
+    Class<FailoverProxyProvider<T>> failoverProxyProviderClass = 
+        getFailoverProxyProviderClass(config, nameNodeUri, xface);
+    if (failoverProxyProviderClass != null) { // HA case
+      FailoverProxyProvider<T> failoverProxyProvider = 
+          createFailoverProxyProvider(config, failoverProxyProviderClass, 
+              xface, nameNodeUri);
+      int delay = config.getInt(
+          DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_KEY,
+          DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_DEFAULT);
+      int maxCap = config.getInt(
+          DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_KEY,
+          DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_DEFAULT);
+      int maxFailoverAttempts = config.getInt(
+          DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY,
+          DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT);
+      InvocationHandler dummyHandler = new LossyRetryInvocationHandler<T>(
+              numResponseToDrop, failoverProxyProvider,
+              RetryPolicies.failoverOnNetworkException(
+                  RetryPolicies.TRY_ONCE_THEN_FAIL, 
+                  Math.max(numResponseToDrop + 1, maxFailoverAttempts), delay, 
+                  maxCap));
+      
+      T proxy = (T) Proxy.newProxyInstance(
+          failoverProxyProvider.getInterface().getClassLoader(),
+          new Class[] { xface }, dummyHandler);
+      Text dtService = HAUtil.buildTokenServiceForLogicalUri(nameNodeUri);
+      return new ProxyAndInfo<T>(proxy, dtService);
+    } else {
+      LOG.warn("Currently creating proxy using " +
+      		"LossyRetryInvocationHandler requires NN HA setup");
+      return null;
+    }
+  }
 
   /**
    * Creates an explicitly non-HA-enabled proxy object. Most of the time you

Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java Wed Oct 30 22:21:59 2013
@@ -27,9 +27,12 @@ import java.nio.ByteBuffer;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.FSInputChecker;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.client.ClientMmap;
+import org.apache.hadoop.hdfs.client.ClientMmapManager;
 import org.apache.hadoop.hdfs.net.Peer;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
 import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
 import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
@@ -485,4 +488,10 @@ public class RemoteBlockReader extends F
   public boolean isShortCircuit() {
     return false;
   }
+
+  @Override
+  public ClientMmap getClientMmap(LocatedBlock curBlock,
+      ClientMmapManager mmapManager) {
+    return null;
+  }
 }

Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java Wed Oct 30 22:21:59 2013
@@ -29,9 +29,12 @@ import java.nio.channels.ReadableByteCha
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.client.ClientMmap;
+import org.apache.hadoop.hdfs.client.ClientMmapManager;
 import org.apache.hadoop.hdfs.net.Peer;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
 import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
 import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver;
@@ -40,7 +43,6 @@ import org.apache.hadoop.hdfs.protocol.p
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
-import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
 import org.apache.hadoop.hdfs.protocolPB.PBHelper;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
@@ -451,4 +453,10 @@ public class RemoteBlockReader2  impleme
   public boolean isShortCircuit() {
     return false;
   }
+
+  @Override
+  public ClientMmap getClientMmap(LocatedBlock curBlock,
+      ClientMmapManager manager) {
+    return null;
+  }
 }

Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java Wed Oct 30 22:21:59 2013
@@ -43,6 +43,7 @@ public class DatanodeID implements Compa
   private String storageID;  // unique per cluster storageID
   private int xferPort;      // data streaming port
   private int infoPort;      // info server port
+  private int infoSecurePort; // info server port
   private int ipcPort;       // IPC server port
 
   public DatanodeID(DatanodeID from) {
@@ -51,6 +52,7 @@ public class DatanodeID implements Compa
         from.getStorageID(),
         from.getXferPort(),
         from.getInfoPort(),
+        from.getInfoSecurePort(),
         from.getIpcPort());
     this.peerHostName = from.getPeerHostName();
   }
@@ -65,12 +67,13 @@ public class DatanodeID implements Compa
    * @param ipcPort ipc server port
    */
   public DatanodeID(String ipAddr, String hostName, String storageID,
-      int xferPort, int infoPort, int ipcPort) {
+      int xferPort, int infoPort, int infoSecurePort, int ipcPort) {
     this.ipAddr = ipAddr;
     this.hostName = hostName;
     this.storageID = storageID;
     this.xferPort = xferPort;
     this.infoPort = infoPort;
+    this.infoSecurePort = infoSecurePort;
     this.ipcPort = ipcPort;
   }
   
@@ -129,6 +132,13 @@ public class DatanodeID implements Compa
   }
 
   /**
+   * @return IP:infoPort string
+   */
+  public String getInfoSecureAddr() {
+    return ipAddr + ":" + infoSecurePort;
+  }
+
+  /**
    * @return hostname:xferPort
    */
   public String getXferAddrWithHostname() {
@@ -180,6 +190,13 @@ public class DatanodeID implements Compa
   }
 
   /**
+   * @return infoSecurePort (the port at which the HTTPS server bound to)
+   */
+  public int getInfoSecurePort() {
+    return infoSecurePort;
+  }
+
+  /**
    * @return ipcPort (the port at which the IPC server bound to)
    */
   public int getIpcPort() {
@@ -218,13 +235,14 @@ public class DatanodeID implements Compa
     peerHostName = nodeReg.getPeerHostName();
     xferPort = nodeReg.getXferPort();
     infoPort = nodeReg.getInfoPort();
+    infoSecurePort = nodeReg.getInfoSecurePort();
     ipcPort = nodeReg.getIpcPort();
   }
     
   /**
    * Compare based on data transfer address.
    *
-   * @param that
+   * @param that datanode to compare with
    * @return as specified by Comparable
    */
   @Override

Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java Wed Oct 30 22:21:59 2013
@@ -17,10 +17,6 @@
  */
 package org.apache.hadoop.hdfs.protocol;
 
-import static org.apache.hadoop.hdfs.DFSUtil.percent2String;
-
-import java.util.Date;
-
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -32,6 +28,10 @@ import org.apache.hadoop.net.NodeBase;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Time;
 
+import java.util.Date;
+
+import static org.apache.hadoop.hdfs.DFSUtil.percent2String;
+
 /** 
  * This class extends the primary identifier of a Datanode with ephemeral
  * state, eg usage information, current administrative state, and the
@@ -108,18 +108,21 @@ public class DatanodeInfo extends Datano
       final long capacity, final long dfsUsed, final long remaining,
       final long blockPoolUsed, final long lastUpdate, final int xceiverCount,
       final AdminStates adminState) {
-    this(nodeID.getIpAddr(), nodeID.getHostName(), nodeID.getStorageID(), nodeID.getXferPort(),
-        nodeID.getInfoPort(), nodeID.getIpcPort(), capacity, dfsUsed, remaining,
-        blockPoolUsed, lastUpdate, xceiverCount, location, adminState);
+    this(nodeID.getIpAddr(), nodeID.getHostName(), nodeID.getStorageID(),
+        nodeID.getXferPort(), nodeID.getInfoPort(), nodeID.getInfoSecurePort(),
+        nodeID.getIpcPort(), capacity, dfsUsed, remaining, blockPoolUsed,
+        lastUpdate, xceiverCount, location, adminState);
   }
 
   /** Constructor */
   public DatanodeInfo(final String ipAddr, final String hostName,
-      final String storageID, final int xferPort, final int infoPort, final int ipcPort,
+      final String storageID, final int xferPort, final int infoPort,
+      final int infoSecurePort, final int ipcPort,
       final long capacity, final long dfsUsed, final long remaining,
       final long blockPoolUsed, final long lastUpdate, final int xceiverCount,
       final String networkLocation, final AdminStates adminState) {
-    super(ipAddr, hostName, storageID, xferPort, infoPort, ipcPort);
+    super(ipAddr, hostName, storageID, xferPort, infoPort,
+            infoSecurePort, ipcPort);
     this.capacity = capacity;
     this.dfsUsed = dfsUsed;
     this.remaining = remaining;

Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java Wed Oct 30 22:21:59 2013
@@ -222,7 +222,8 @@ public class PBHelper {
   // DatanodeId
   public static DatanodeID convert(DatanodeIDProto dn) {
     return new DatanodeID(dn.getIpAddr(), dn.getHostName(), dn.getStorageID(),
-        dn.getXferPort(), dn.getInfoPort(), dn.getIpcPort());
+        dn.getXferPort(), dn.getInfoPort(), dn.hasInfoSecurePort() ? dn
+        .getInfoSecurePort() : 0, dn.getIpcPort());
   }
 
   public static DatanodeIDProto convert(DatanodeID dn) {
@@ -232,6 +233,7 @@ public class PBHelper {
         .setStorageID(dn.getStorageID())
         .setXferPort(dn.getXferPort())
         .setInfoPort(dn.getInfoPort())
+        .setInfoSecurePort(dn.getInfoSecurePort())
         .setIpcPort(dn.getIpcPort()).build();
   }
 

Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/delegation/DelegationTokenSecretManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/delegation/DelegationTokenSecretManager.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/delegation/DelegationTokenSecretManager.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/delegation/DelegationTokenSecretManager.java Wed Oct 30 22:21:59 2013
@@ -37,6 +37,7 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.namenode.startupprogress.Step;
 import org.apache.hadoop.hdfs.server.namenode.startupprogress.StepType;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.RetriableException;
 import org.apache.hadoop.ipc.StandbyException;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.SecurityUtil;
@@ -58,6 +59,15 @@ public class DelegationTokenSecretManage
       .getLog(DelegationTokenSecretManager.class);
   
   private final FSNamesystem namesystem;
+
+  public DelegationTokenSecretManager(long delegationKeyUpdateInterval,
+      long delegationTokenMaxLifetime, long delegationTokenRenewInterval,
+      long delegationTokenRemoverScanInterval, FSNamesystem namesystem) {
+    this(delegationKeyUpdateInterval, delegationTokenMaxLifetime,
+        delegationTokenRenewInterval, delegationTokenRemoverScanInterval, false,
+        namesystem);
+  }
+
   /**
    * Create a secret manager
    * @param delegationKeyUpdateInterval the number of seconds for rolling new
@@ -67,13 +77,16 @@ public class DelegationTokenSecretManage
    * @param delegationTokenRenewInterval how often the tokens must be renewed
    * @param delegationTokenRemoverScanInterval how often the tokens are scanned
    *        for expired tokens
+   * @param storeTokenTrackingId whether to store the token's tracking id
    */
   public DelegationTokenSecretManager(long delegationKeyUpdateInterval,
       long delegationTokenMaxLifetime, long delegationTokenRenewInterval,
-      long delegationTokenRemoverScanInterval, FSNamesystem namesystem) {
+      long delegationTokenRemoverScanInterval, boolean storeTokenTrackingId,
+      FSNamesystem namesystem) {
     super(delegationKeyUpdateInterval, delegationTokenMaxLifetime,
         delegationTokenRenewInterval, delegationTokenRemoverScanInterval);
     this.namesystem = namesystem;
+    this.storeTokenTrackingId = storeTokenTrackingId;
   }
 
   @Override //SecretManager
@@ -103,6 +116,24 @@ public class DelegationTokenSecretManage
     return super.retrievePassword(identifier);
   }
   
+  @Override
+  public byte[] retriableRetrievePassword(DelegationTokenIdentifier identifier)
+      throws InvalidToken, StandbyException, RetriableException, IOException {
+    namesystem.checkOperation(OperationCategory.READ);
+    try {
+      return super.retrievePassword(identifier);
+    } catch (InvalidToken it) {
+      if (namesystem.inTransitionToActive()) {
+        // if the namesystem is currently in the middle of transition to 
+        // active state, let client retry since the corresponding editlog may 
+        // have not been applied yet
+        throw new RetriableException(it);
+      } else {
+        throw it;
+      }
+    }
+  }
+  
   /**
    * Returns expiry time of a token given its identifier.
    * 
@@ -184,7 +215,7 @@ public class DelegationTokenSecretManage
     }
     if (currentTokens.get(identifier) == null) {
       currentTokens.put(identifier, new DelegationTokenInformation(expiryTime,
-          password));
+          password, getTrackingIdIfEnabled(identifier)));
     } else {
       throw new IOException(
           "Same delegation token being added twice; invalid entry in fsimage or editlogs");
@@ -223,7 +254,7 @@ public class DelegationTokenSecretManage
       byte[] password = createPassword(identifier.getBytes(), allKeys
           .get(keyId).getKey());
       currentTokens.put(identifier, new DelegationTokenInformation(expiryTime,
-          password));
+          password, getTrackingIdIfEnabled(identifier)));
     }
   }
 

Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java Wed Oct 30 22:21:59 2013
@@ -506,7 +506,7 @@ public class Balancer {
     final DatanodeInfo datanode;
     final double utilization;
     final long maxSize2Move;
-    protected long scheduledSize = 0L;
+    private long scheduledSize = 0L;
     //  blocks being moved but not confirmed yet
     private List<PendingBlockMove> pendingBlocks = 
       new ArrayList<PendingBlockMove>(MAX_NUM_CONCURRENT_MOVES); 
@@ -555,20 +555,35 @@ public class Balancer {
     }
     
     /** Decide if still need to move more bytes */
-    protected boolean hasSpaceForScheduling() {
+    protected synchronized boolean hasSpaceForScheduling() {
       return scheduledSize<maxSize2Move;
     }
 
     /** Return the total number of bytes that need to be moved */
-    protected long availableSizeToMove() {
+    protected synchronized long availableSizeToMove() {
       return maxSize2Move-scheduledSize;
     }
     
-    /* increment scheduled size */
-    protected void incScheduledSize(long size) {
+    /** increment scheduled size */
+    protected synchronized void incScheduledSize(long size) {
       scheduledSize += size;
     }
     
+    /** decrement scheduled size */
+    protected synchronized void decScheduledSize(long size) {
+      scheduledSize -= size;
+    }
+    
+    /** get scheduled size */
+    protected synchronized long getScheduledSize(){
+      return scheduledSize;
+    }
+    
+    /** get scheduled size */
+    protected synchronized void setScheduledSize(long size){
+      scheduledSize = size;
+    }
+    
     /* Check if the node can schedule more blocks to move */
     synchronized private boolean isPendingQNotFull() {
       if ( pendingBlocks.size() < MAX_NUM_CONCURRENT_MOVES ) {
@@ -702,8 +717,8 @@ public class Balancer {
           pendingBlock.source = this;
           pendingBlock.target = target;
           if ( pendingBlock.chooseBlockAndProxy() ) {
-            long blockSize = pendingBlock.block.getNumBytes(); 
-            scheduledSize -= blockSize;
+            long blockSize = pendingBlock.block.getNumBytes();
+            decScheduledSize(blockSize);
             task.size -= blockSize;
             if (task.size == 0) {
               tasks.remove();
@@ -747,10 +762,11 @@ public class Balancer {
     private static final long MAX_ITERATION_TIME = 20*60*1000L; //20 mins
     private void dispatchBlocks() {
       long startTime = Time.now();
+      long scheduledSize = getScheduledSize();
       this.blocksToReceive = 2*scheduledSize;
       boolean isTimeUp = false;
       int noPendingBlockIteration = 0;
-      while(!isTimeUp && scheduledSize>0 &&
+      while(!isTimeUp && getScheduledSize()>0 &&
           (!srcBlockList.isEmpty() || blocksToReceive>0)) {
         PendingBlockMove pendingBlock = chooseNextBlockToMove();
         if (pendingBlock != null) {
@@ -779,7 +795,7 @@ public class Balancer {
           // in case no blocks can be moved for source node's task,
           // jump out of while-loop after 5 iterations.
           if (noPendingBlockIteration >= MAX_NO_PENDING_BLOCK_ITERATIONS) {
-            scheduledSize = 0;
+            setScheduledSize(0);
           }
         }
         
@@ -992,7 +1008,7 @@ public class Balancer {
 
     long bytesToMove = 0L;
     for (Source src : sources) {
-      bytesToMove += src.scheduledSize;
+      bytesToMove += src.getScheduledSize();
     }
     return bytesToMove;
   }
@@ -1093,7 +1109,7 @@ public class Balancer {
       bytesMoved += bytes;
     }
 
-    private long get() {
+    private synchronized long get() {
       return bytesMoved;
     }
   };