You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by wa...@apache.org on 2013/08/20 20:07:49 UTC

svn commit: r1515906 [1/2] - in /hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs: dev-support/ src/ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/client/ src/main/native/libhdfs/ src/main/native/libhdfs/tes...

Author: wang
Date: Tue Aug 20 18:07:47 2013
New Revision: 1515906

URL: http://svn.apache.org/r1515906
Log:
HDFS-4953. Enable HDFS local reads via mmap. Contributed by Colin Patrick McCabe.

Added:
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HdfsZeroCopyCursor.java   (with props)
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ClientMmap.java   (with props)
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ClientMmapManager.java   (with props)
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/expect.c   (with props)
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/test/test_libhdfs_zerocopy.c   (with props)
Modified:
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/CMakeLists.txt
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/expect.h
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.c
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.h
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs_test.h
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/jni_helper.c
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/jni_helper.h
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/native_mini_dfs.c
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/native_mini_dfs.h
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml?rev=1515906&r1=1515905&r2=1515906&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml Tue Aug 20 18:07:47 2013
@@ -330,4 +330,14 @@
        <Method name="setDirInternal" />
        <Bug pattern="DM_STRING_CTOR" />
      </Match>
+    <Match>
+      <Class name="org.apache.hadoop.hdfs.client.ClientMmapManager" />
+      <Method name="create" />
+      <Bug pattern="UL_UNRELEASED_LOCK_EXCEPTION_PATH" />
+    </Match>
+    <Match>
+      <Class name="org.apache.hadoop.hdfs.client.ClientMmapManager" />
+      <Method name="create" />
+      <Bug pattern="UL_UNRELEASED_LOCK" />
+    </Match>
  </FindBugsFilter>

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/CMakeLists.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/CMakeLists.txt?rev=1515906&r1=1515905&r2=1515906&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/CMakeLists.txt (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/CMakeLists.txt Tue Aug 20 18:07:47 2013
@@ -142,6 +142,7 @@ target_link_libraries(test_native_mini_d
 )
 
 add_executable(test_libhdfs_threaded
+    main/native/libhdfs/expect.c
     main/native/libhdfs/test_libhdfs_threaded.c
 )
 target_link_libraries(test_libhdfs_threaded
@@ -150,6 +151,16 @@ target_link_libraries(test_libhdfs_threa
     pthread
 )
 
+add_executable(test_libhdfs_zerocopy
+    main/native/libhdfs/expect.c
+    main/native/libhdfs/test/test_libhdfs_zerocopy.c
+)
+target_link_libraries(test_libhdfs_zerocopy
+    hdfs
+    native_mini_dfs
+    pthread
+)
+
 IF(REQUIRE_LIBWEBHDFS)
     add_subdirectory(contrib/libwebhdfs)
 ENDIF(REQUIRE_LIBWEBHDFS)

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java?rev=1515906&r1=1515905&r2=1515906&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java Tue Aug 20 18:07:47 2013
@@ -20,6 +20,8 @@ package org.apache.hadoop.hdfs;
 import java.io.IOException;
 
 import org.apache.hadoop.fs.ByteBufferReadable;
+import org.apache.hadoop.hdfs.client.ClientMmapManager;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 
 /**
  * A BlockReader is responsible for reading a single block
@@ -81,4 +83,21 @@ public interface BlockReader extends Byt
    *                      All short-circuit reads are also local.
    */
   boolean isShortCircuit();
+
+  /**
+   * Do a zero-copy read with the current block reader.
+   *
+   * We assume that the calling code has done bounds checking, and won't ask 
+   * us for more bytes than are supposed to be visible (or are in the file).
+   *
+   * @param buffers       The zero-copy buffers object.
+   * @param curBlock      The current block.
+   * @param blockPos      Position in the current block to start reading at.
+   * @param toRead        The number of bytes to read from the block.
+   * 
+   * @return              true if the read was done, false otherwise.
+   */
+  boolean readZeroCopy(HdfsZeroCopyCursor buffers,
+        LocatedBlock curBlock, long blockPos, int toRead,
+        ClientMmapManager mmapManager);
 }

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java?rev=1515906&r1=1515905&r2=1515906&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java Tue Aug 20 18:07:47 2013
@@ -22,11 +22,15 @@ import java.io.DataInputStream;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import org.apache.hadoop.conf.Configuration;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hdfs.client.ClientMmap;
+import org.apache.hadoop.hdfs.client.ClientMmapManager;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
 import org.apache.hadoop.hdfs.util.DirectBufferPool;
 import org.apache.hadoop.io.IOUtils;
@@ -87,6 +91,8 @@ class BlockReaderLocal implements BlockR
   private final ExtendedBlock block;
   
   private final FileInputStreamCache fisCache;
+  private ClientMmap clientMmap;
+  private boolean mmapDisabled;
   
   private static int getSlowReadBufferNumChunks(int bufSize,
       int bytesPerChecksum) {
@@ -113,6 +119,8 @@ class BlockReaderLocal implements BlockR
     this.datanodeID = datanodeID;
     this.block = block;
     this.fisCache = fisCache;
+    this.clientMmap = null;
+    this.mmapDisabled = false;
 
     // read and handle the common header here. For now just a version
     checksumIn.getChannel().position(0);
@@ -487,6 +495,10 @@ class BlockReaderLocal implements BlockR
 
   @Override
   public synchronized void close() throws IOException {
+    if (clientMmap != null) {
+      clientMmap.unref();
+      clientMmap = null;
+    }
     if (fisCache != null) {
       if (LOG.isDebugEnabled()) {
         LOG.debug("putting FileInputStream for " + filename +
@@ -534,4 +546,48 @@ class BlockReaderLocal implements BlockR
   public boolean isShortCircuit() {
     return true;
   }
+
+  @Override
+  public boolean readZeroCopy(HdfsZeroCopyCursor cursor,
+        LocatedBlock curBlock, long blockPos, int toRead,
+        ClientMmapManager mmapManager) {
+    if (clientMmap == null) {
+      if (mmapDisabled) {
+        return false;
+      }
+      try {
+        clientMmap = mmapManager.fetch(datanodeID, block, dataIn);
+        if (clientMmap == null) {
+          mmapDisabled = true;
+          return false;
+        }
+      } catch (InterruptedException e) {
+        LOG.error("Interrupted while setting up mmap for " + filename, e);
+        Thread.currentThread().interrupt();
+        return false;
+      } catch (IOException e) {
+        LOG.error("unable to set up mmap for " + filename, e);
+        mmapDisabled = true;
+        return false;
+      }
+    }
+    long limit = blockPos + toRead;
+    if (limit > Integer.MAX_VALUE) {
+      /*
+       * In Java, ByteBuffers use a 32-bit length, capacity, offset, etc.
+       * This limits our mmap'ed regions to 2 GB in length.
+       * TODO: we can implement zero-copy for larger blocks by doing multiple
+       * mmaps.
+       */
+      mmapDisabled = true;
+      clientMmap.unref();
+      clientMmap = null;
+      return false;
+    }
+    ByteBuffer mmapBuffer = clientMmap.getMappedByteBuffer().duplicate();
+    mmapBuffer.position((int)blockPos);
+    mmapBuffer.limit((int)limit);
+    cursor.setMmap(clientMmap, mmapBuffer);
+    return true;
+  }
 }

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java?rev=1515906&r1=1515905&r2=1515906&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java Tue Aug 20 18:07:47 2013
@@ -28,6 +28,7 @@ import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.Map;
 
+import org.apache.hadoop.hdfs.client.ClientMmapManager;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -35,6 +36,7 @@ import org.apache.hadoop.hdfs.protocol.B
 import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
 import org.apache.hadoop.hdfs.util.DirectBufferPool;
@@ -701,4 +703,11 @@ class BlockReaderLocalLegacy implements 
   public boolean isShortCircuit() {
     return true;
   }
+
+  @Override
+  public boolean readZeroCopy(HdfsZeroCopyCursor buffers,
+      LocatedBlock curBlock, long blockPos, int toRead,
+      ClientMmapManager mmapManager) {
+    return false;
+  }
 }

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1515906&r1=1515905&r2=1515906&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java Tue Aug 20 18:07:47 2013
@@ -103,6 +103,7 @@ import org.apache.hadoop.fs.ParentNotDir
 import org.apache.hadoop.fs.UnresolvedLinkException;
 import org.apache.hadoop.fs.VolumeId;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.client.ClientMmapManager;
 import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
 import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
@@ -204,7 +205,43 @@ public class DFSClient implements java.i
   private boolean shouldUseLegacyBlockReaderLocal;
   private final CachingStrategy defaultReadCachingStrategy;
   private final CachingStrategy defaultWriteCachingStrategy;
+  private ClientMmapManager mmapManager;
   
+  private static final ClientMmapManagerFactory MMAP_MANAGER_FACTORY =
+      new ClientMmapManagerFactory();
+
+  private static final class ClientMmapManagerFactory {
+    private ClientMmapManager mmapManager = null;
+    /**
+     * Tracks the number of users of mmapManager.
+     */
+    private int refcnt = 0;
+
+    synchronized ClientMmapManager get(Configuration conf) {
+      if (refcnt++ == 0) {
+        mmapManager = ClientMmapManager.fromConf(conf);
+      } else {
+        String mismatches = mmapManager.verifyConfigurationMatches(conf);
+        if (!mismatches.isEmpty()) {
+          LOG.warn("The ClientMmapManager settings you specified " +
+            "have been ignored because another thread created the " +
+            "ClientMmapManager first.  " + mismatches);
+        }
+      }
+      return mmapManager;
+    }
+    
+    synchronized void unref(ClientMmapManager mmapManager) {
+      if (this.mmapManager != mmapManager) {
+        throw new IllegalArgumentException();
+      }
+      if (--refcnt == 0) {
+        IOUtils.cleanup(LOG, mmapManager);
+        mmapManager = null;
+      }
+    }
+  }
+
   /**
    * DFSClient configuration 
    */
@@ -513,6 +550,7 @@ public class DFSClient implements java.i
         new CachingStrategy(readDropBehind, readahead);
     this.defaultWriteCachingStrategy =
         new CachingStrategy(writeDropBehind, readahead);
+    this.mmapManager = MMAP_MANAGER_FACTORY.get(conf);
   }
 
   /**
@@ -716,9 +754,12 @@ public class DFSClient implements java.i
   
   /** Abort and release resources held.  Ignore all errors. */
   void abort() {
+    if (mmapManager != null) {
+      MMAP_MANAGER_FACTORY.unref(mmapManager);
+      mmapManager = null;
+    }
     clientRunning = false;
     closeAllFilesBeingWritten(true);
-
     try {
       // remove reference to this client and stop the renewer,
       // if there is no more clients under the renewer.
@@ -762,6 +803,10 @@ public class DFSClient implements java.i
    */
   @Override
   public synchronized void close() throws IOException {
+    if (mmapManager != null) {
+      MMAP_MANAGER_FACTORY.unref(mmapManager);
+      mmapManager = null;
+    }
     if(clientRunning) {
       closeAllFilesBeingWritten(false);
       clientRunning = false;
@@ -2474,4 +2519,8 @@ public class DFSClient implements java.i
   public CachingStrategy getDefaultWriteCachingStrategy() {
     return defaultWriteCachingStrategy;
   }
+
+  ClientMmapManager getMmapManager() {
+    return mmapManager;
+  }
 }

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1515906&r1=1515905&r2=1515906&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Tue Aug 20 18:07:47 2013
@@ -373,6 +373,12 @@ public class DFSConfigKeys extends Commo
   public static final int DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_DEFAULT = 1024 * 1024;
   public static final String DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC = "dfs.client.domain.socket.data.traffic";
   public static final boolean DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT = false;
+  public static final String DFS_CLIENT_MMAP_CACHE_SIZE = "dfs.client.mmap.cache.size";
+  public static final int DFS_CLIENT_MMAP_CACHE_SIZE_DEFAULT = 1024;
+  public static final String DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS = "dfs.client.mmap.cache.timeout.ms";
+  public static final long DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS_DEFAULT  = 15 * 60 * 1000;
+  public static final String DFS_CLIENT_MMAP_CACHE_THREAD_RUNS_PER_TIMEOUT = "dfs.client.mmap.cache.timeout.ms";
+  public static final int DFS_CLIENT_MMAP_CACHE_THREAD_RUNS_PER_TIMEOUT_DEFAULT  = 4;
 
   // property for fsimage compression
   public static final String DFS_IMAGE_COMPRESS_KEY = "dfs.image.compress";

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java?rev=1515906&r1=1515905&r2=1515906&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java Tue Aug 20 18:07:47 2013
@@ -41,6 +41,7 @@ import org.apache.hadoop.fs.CanSetReadah
 import org.apache.hadoop.fs.ChecksumException;
 import org.apache.hadoop.fs.FSInputStream;
 import org.apache.hadoop.fs.UnresolvedLinkException;
+import org.apache.hadoop.fs.ZeroCopyCursor;
 import org.apache.hadoop.hdfs.net.DomainPeer;
 import org.apache.hadoop.hdfs.net.Peer;
 import org.apache.hadoop.hdfs.net.TcpPeerServer;
@@ -92,12 +93,14 @@ implements ByteBufferReadable, CanSetDro
       this.totalBytesRead = 0;
       this.totalLocalBytesRead = 0;
       this.totalShortCircuitBytesRead = 0;
+      this.totalZeroCopyBytesRead = 0;
     }
 
     public ReadStatistics(ReadStatistics rhs) {
       this.totalBytesRead = rhs.getTotalBytesRead();
       this.totalLocalBytesRead = rhs.getTotalLocalBytesRead();
       this.totalShortCircuitBytesRead = rhs.getTotalShortCircuitBytesRead();
+      this.totalZeroCopyBytesRead = rhs.getTotalZeroCopyBytesRead();
     }
 
     /**
@@ -123,6 +126,13 @@ implements ByteBufferReadable, CanSetDro
     public long getTotalShortCircuitBytesRead() {
       return totalShortCircuitBytesRead;
     }
+    
+    /**
+     * @return The total number of zero-copy bytes read.
+     */
+    public long getTotalZeroCopyBytesRead() {
+      return totalZeroCopyBytesRead;
+    }
 
     /**
      * @return The total number of bytes read which were not local.
@@ -145,12 +155,21 @@ implements ByteBufferReadable, CanSetDro
       this.totalLocalBytesRead += amt;
       this.totalShortCircuitBytesRead += amt;
     }
+
+    void addZeroCopyBytes(long amt) {
+      this.totalBytesRead += amt;
+      this.totalLocalBytesRead += amt;
+      this.totalShortCircuitBytesRead += amt;
+      this.totalZeroCopyBytesRead += amt;
+    }
     
     private long totalBytesRead;
 
     private long totalLocalBytesRead;
 
     private long totalShortCircuitBytesRead;
+
+    private long totalZeroCopyBytesRead;
   }
   
   private final FileInputStreamCache fileInputStreamCache;
@@ -1393,4 +1412,67 @@ implements ByteBufferReadable, CanSetDro
     this.cachingStrategy.setDropBehind(dropBehind);
     closeCurrentBlockReader();
   }
+
+  synchronized void readZeroCopy(HdfsZeroCopyCursor zcursor, int toRead)
+      throws IOException {
+    assert(toRead > 0);
+    if (((blockReader == null) || (blockEnd == -1)) &&
+          (pos < getFileLength())) {
+      /*
+       * If we don't have a blockReader, or the one we have has no more bytes
+       * left to read, we call seekToBlockSource to get a new blockReader and
+       * recalculate blockEnd.  Note that we assume we're not at EOF here
+       * (we check this above).
+       */
+      if ((!seekToBlockSource(pos)) || (blockReader == null)) {
+        throw new IOException("failed to allocate new BlockReader " +
+            "at position " + pos);
+      }
+    }
+    long curPos = pos;
+    boolean canSkipChecksums = zcursor.getSkipChecksums();
+    long blockLeft = blockEnd - curPos + 1;
+    if (zcursor.getAllowShortReads()) {
+      if (blockLeft < toRead) {
+        toRead = (int)blockLeft;
+      }
+    }
+    if (canSkipChecksums && (toRead <= blockLeft)) {
+      long blockStartInFile = currentLocatedBlock.getStartOffset();
+      long blockPos = curPos - blockStartInFile;
+      if (blockReader.readZeroCopy(zcursor,
+            currentLocatedBlock, blockPos, toRead,
+            dfsClient.getMmapManager())) {
+        if (DFSClient.LOG.isDebugEnabled()) {
+          DFSClient.LOG.debug("readZeroCopy read " + toRead + " bytes from " +
+              "offset " + curPos + " via the zero-copy read path.  " +
+              "blockEnd = " + blockEnd);
+        }
+        readStatistics.addZeroCopyBytes(toRead);
+        seek(pos + toRead);
+        return;
+      }
+    }
+    /*
+     * Slow path reads.
+     *
+     * readStatistics will be updated when we call back into this
+     * stream's read methods.
+     */
+    long prevBlockEnd = blockEnd;
+    int slowReadAmount = zcursor.readViaSlowPath(toRead);
+    if (DFSClient.LOG.isDebugEnabled()) {
+      DFSClient.LOG.debug("readZeroCopy read " + slowReadAmount + " bytes " +
+          "from offset " + curPos + " via the fallback read path.  " +
+          "prevBlockEnd = " + prevBlockEnd + ", blockEnd = " + blockEnd +
+          ", canSkipChecksums = " + canSkipChecksums);
+    }
+  }
+
+  @Override
+  public ZeroCopyCursor createZeroCopyCursor() 
+      throws IOException, UnsupportedOperationException {
+    return new HdfsZeroCopyCursor(this,
+        dfsClient.getConf().skipShortCircuitChecksums);
+  }
 }

Added: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HdfsZeroCopyCursor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HdfsZeroCopyCursor.java?rev=1515906&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HdfsZeroCopyCursor.java (added)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HdfsZeroCopyCursor.java Tue Aug 20 18:07:47 2013
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.ZeroCopyCursor;
+import org.apache.hadoop.hdfs.client.ClientMmap;
+
+public class HdfsZeroCopyCursor implements ZeroCopyCursor  {
+  public static final Log LOG = LogFactory.getLog(HdfsZeroCopyCursor.class);
+  private DFSInputStream stream;
+  private boolean skipChecksums;
+  private boolean allowShortReads;
+  private ClientMmap mmap;
+  private ByteBuffer fallbackBuffer;
+  private ByteBuffer readBuffer;
+  
+  HdfsZeroCopyCursor(DFSInputStream stream, boolean skipChecksums) {
+    this.stream = stream;
+    this.skipChecksums = skipChecksums;
+    this.allowShortReads = false;
+    this.mmap = null;
+    this.fallbackBuffer = null;
+    this.readBuffer = null;
+  }
+
+  @Override
+  public void close() throws IOException {
+    stream = null;
+    if (mmap != null) {
+      mmap.unref();
+      mmap = null;
+    }
+    fallbackBuffer = null;
+    readBuffer = null;
+  }
+
+  @Override
+  public void setFallbackBuffer(ByteBuffer fallbackBuffer) {
+    this.fallbackBuffer = fallbackBuffer;
+  }
+
+  @Override
+  public ByteBuffer getFallbackBuffer() {
+    return this.fallbackBuffer;
+  }
+
+  @Override
+  public void setSkipChecksums(boolean skipChecksums) {
+    this.skipChecksums = skipChecksums;
+  }
+
+  @Override
+  public boolean getSkipChecksums() {
+    return this.skipChecksums;
+  }
+
+  @Override
+  public void setAllowShortReads(boolean allowShortReads) {
+    this.allowShortReads = allowShortReads;
+  }
+
+  @Override
+  public boolean getAllowShortReads() {
+    return this.allowShortReads;
+  }
+
+  @Override
+  public void read(int toRead) throws UnsupportedOperationException,
+      EOFException, IOException {
+    if (toRead < 0) {
+      throw new IllegalArgumentException("can't read " + toRead + " bytes.");
+    }
+    stream.readZeroCopy(this, toRead);
+  }
+
+  @Override
+  public ByteBuffer getData() {
+    return readBuffer;
+  }
+  
+  int readViaSlowPath(int toRead) throws EOFException, IOException {
+    if (fallbackBuffer == null) {
+      throw new UnsupportedOperationException("unable to read via " +
+          "the fastpath, and there was no fallback buffer provided.");
+    }
+    fallbackBuffer.clear();
+    fallbackBuffer.limit(toRead); // will throw if toRead is too large
+  
+    int totalRead = 0;
+    readBuffer = fallbackBuffer;
+    try {
+      while (toRead > 0) {
+        int nread = stream.read(fallbackBuffer);
+        if (nread < 0) {
+          break;
+        }
+        toRead -= nread;
+        totalRead += nread;
+        if (allowShortReads) {
+          break;
+        }
+      }
+    } finally {
+      fallbackBuffer.flip();
+    }
+    if ((toRead > 0) && (!allowShortReads)) {
+      throw new EOFException("only read " + totalRead + " bytes out of " +
+          "a requested " + toRead + " before hitting EOF");
+    }
+    return totalRead;
+  }
+  
+  void setMmap(ClientMmap mmap, ByteBuffer readBuffer) {
+    if (this.mmap != mmap) {
+      if (this.mmap != null) {
+        this.mmap.unref();
+      }
+    }
+    this.mmap = mmap;
+    mmap.ref();
+    this.readBuffer = readBuffer;
+  }
+
+  ClientMmap getMmap() {
+    return mmap;
+  }
+}

Propchange: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HdfsZeroCopyCursor.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java?rev=1515906&r1=1515905&r2=1515906&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java Tue Aug 20 18:07:47 2013
@@ -27,9 +27,11 @@ import java.nio.ByteBuffer;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.FSInputChecker;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.client.ClientMmapManager;
 import org.apache.hadoop.hdfs.net.Peer;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
 import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
 import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
@@ -485,4 +487,11 @@ public class RemoteBlockReader extends F
   public boolean isShortCircuit() {
     return false;
   }
+
+  @Override
+  public boolean readZeroCopy(HdfsZeroCopyCursor buffers,
+      LocatedBlock curBlock, long blockPos, int toRead,
+      ClientMmapManager mmapManager) {
+    return false;
+  }
 }

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java?rev=1515906&r1=1515905&r2=1515906&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java Tue Aug 20 18:07:47 2013
@@ -29,9 +29,11 @@ import java.nio.channels.ReadableByteCha
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.client.ClientMmapManager;
 import org.apache.hadoop.hdfs.net.Peer;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
 import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
 import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver;
@@ -40,7 +42,6 @@ import org.apache.hadoop.hdfs.protocol.p
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
-import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
 import org.apache.hadoop.hdfs.protocolPB.PBHelper;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
@@ -451,4 +452,11 @@ public class RemoteBlockReader2  impleme
   public boolean isShortCircuit() {
     return false;
   }
+
+  @Override
+  public boolean readZeroCopy(HdfsZeroCopyCursor buffers,
+      LocatedBlock curBlock, long blockPos, int toRead,
+      ClientMmapManager manager) {
+    return false;
+  }
 }

Added: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ClientMmap.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ClientMmap.java?rev=1515906&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ClientMmap.java (added)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ClientMmap.java Tue Aug 20 18:07:47 2013
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.client;
+
+import java.io.FileInputStream;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+
+import java.io.IOException;
+import java.lang.ref.WeakReference;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel.MapMode;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * A memory-mapped region used by an HDFS client.
+ * 
+ * This class includes a reference count and some other information used by
+ * ClientMmapManager to track and cache mmaps.
+ */
+@InterfaceAudience.Private
+public class ClientMmap {
+  static final Log LOG = LogFactory.getLog(ClientMmap.class);
+  
+  /**
+   * A reference to the manager of this mmap.
+   * 
+   * This is only a weak reference to help minimize the damange done by
+   * code which leaks references accidentally.
+   */
+  private final WeakReference<ClientMmapManager> manager;
+  
+  /**
+   * The actual mapped memory region.
+   */
+  private final MappedByteBuffer map;
+  
+  /**
+   * A reference count tracking how many threads are using this object.
+   */
+  private final AtomicInteger refCount = new AtomicInteger(1);
+
+  /**
+   * Block pertaining to this mmap
+   */
+  private final ExtendedBlock block;
+  
+  /**
+   * The DataNode where this mmap came from.
+   */
+  private final DatanodeID datanodeID;
+
+  /**
+   * The monotonic time when this mmap was last evictable.
+   */
+  private long lastEvictableTimeNs;
+
+  public static ClientMmap load(ClientMmapManager manager, FileInputStream in, 
+      ExtendedBlock block, DatanodeID datanodeID) 
+          throws IOException {
+    MappedByteBuffer map =
+        in.getChannel().map(MapMode.READ_ONLY, 0,
+            in.getChannel().size());
+    return new ClientMmap(manager, map, block, datanodeID);
+  }
+
+  private ClientMmap(ClientMmapManager manager, MappedByteBuffer map, 
+        ExtendedBlock block, DatanodeID datanodeID) 
+            throws IOException {
+    this.manager = new WeakReference<ClientMmapManager>(manager);
+    this.map = map;
+    this.block = block;
+    this.datanodeID = datanodeID;
+    this.lastEvictableTimeNs = 0;
+  }
+
+  /**
+   * Decrement the reference count on this object.
+   * Should be called with the ClientMmapManager lock held.
+   */
+  public void unref() {
+    int count = refCount.decrementAndGet();
+    if (count < 0) {
+      throw new IllegalArgumentException("can't decrement the " +
+          "reference count on this ClientMmap lower than 0.");
+    } else if (count == 0) {
+      ClientMmapManager man = manager.get();
+      if (man == null) {
+        unmap();
+      } else {
+        man.makeEvictable(this);
+      }
+    }
+  }
+
+  /**
+   * Increment the reference count on this object.
+   *
+   * @return     The new reference count.
+   */
+  public int ref() {
+    return refCount.getAndIncrement();
+  }
+
+  @VisibleForTesting
+  public ExtendedBlock getBlock() {
+    return block;
+  }
+
+  DatanodeID getDatanodeID() {
+    return datanodeID;
+  }
+
+  public MappedByteBuffer getMappedByteBuffer() {
+    return map;
+  }
+
+  public void setLastEvictableTimeNs(long lastEvictableTimeNs) {
+    this.lastEvictableTimeNs = lastEvictableTimeNs;
+  }
+
+  public long getLastEvictableTimeNs() {
+    return this.lastEvictableTimeNs;
+  }
+
+  /**
+   * Unmap the memory region.
+   *
+   * There isn't any portable way to unmap a memory region in Java.
+   * So we use the sun.nio method here.
+   * Note that unmapping a memory region could cause crashes if code
+   * continues to reference the unmapped code.  However, if we don't
+   * manually unmap the memory, we are dependent on the finalizer to
+   * do it, and we have no idea when the finalizer will run.
+   */
+  void unmap() {
+    assert(refCount.get() == 0);
+    if (map instanceof sun.nio.ch.DirectBuffer) {
+      final sun.misc.Cleaner cleaner =
+          ((sun.nio.ch.DirectBuffer) map).cleaner();
+      cleaner.clean();
+    }
+  }
+}

Propchange: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ClientMmap.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ClientMmapManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ClientMmapManager.java?rev=1515906&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ClientMmapManager.java (added)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ClientMmapManager.java Tue Aug 20 18:07:47 2013
@@ -0,0 +1,476 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.client;
+
+import java.io.Closeable;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.lang.ref.WeakReference;
+import java.util.Iterator;
+import java.util.TreeMap;
+import java.util.Map.Entry;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_SIZE;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_SIZE_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_THREAD_RUNS_PER_TIMEOUT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_THREAD_RUNS_PER_TIMEOUT_DEFAULT;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.io.IOUtils;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ComparisonChain;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * Tracks mmap instances used on an HDFS client.
+ *
+ * mmaps can be used concurrently by multiple threads at once.
+ * mmaps cannot be closed while they are in use.
+ *
+ * The cache is important for performance, because the first time an mmap is
+ * created, the page table entries (PTEs) are not yet set up.
+ * Even when reading data that is entirely resident in memory, reading an
+ * mmap the second time is faster.
+ */
+@InterfaceAudience.Private
+public class ClientMmapManager implements Closeable {
+  public static final Log LOG = LogFactory.getLog(ClientMmapManager.class);
+
+  private boolean closed = false;
+
+  private final int cacheSize;
+
+  private final long timeoutNs;
+
+  private final int runsPerTimeout;
+
+  private final Lock lock = new ReentrantLock();
+  
+  /**
+   * Maps block, datanode_id to the client mmap object.
+   * If the ClientMmap is in the process of being loaded,
+   * {@link Waitable<ClientMmap>#await()} will block.
+   *
+   * Protected by the ClientMmapManager lock.
+   */
+  private final TreeMap<Key, Waitable<ClientMmap>> mmaps =
+      new TreeMap<Key, Waitable<ClientMmap>>();
+
+  /**
+   * Maps the last use time to the client mmap object.
+   * We ensure that each last use time is unique by inserting a jitter of a
+   * nanosecond or two if necessary.
+   * 
+   * Protected by the ClientMmapManager lock.
+   * ClientMmap objects that are in use are never evictable.
+   */
+  private final TreeMap<Long, ClientMmap> evictable =
+      new TreeMap<Long, ClientMmap>();
+
+  private final ScheduledThreadPoolExecutor executor = 
+      new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder().
+          setDaemon(true).setNameFormat("ClientMmapManager").
+          build());
+  
+  /**
+   * The CacheCleaner for this ClientMmapManager.  We don't create this
+   * and schedule it until it becomes necessary.
+   */
+  private CacheCleaner cacheCleaner;
+
+  /**
+   * Factory method to create a ClientMmapManager from a Hadoop
+   * configuration.
+   */
+  public static ClientMmapManager fromConf(Configuration conf) {
+    return new ClientMmapManager(conf.getInt(DFS_CLIENT_MMAP_CACHE_SIZE,
+      DFS_CLIENT_MMAP_CACHE_SIZE_DEFAULT),
+      conf.getLong(DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS,
+        DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS_DEFAULT),
+      conf.getInt(DFS_CLIENT_MMAP_CACHE_THREAD_RUNS_PER_TIMEOUT,
+        DFS_CLIENT_MMAP_CACHE_THREAD_RUNS_PER_TIMEOUT_DEFAULT));
+  }
+
+  public ClientMmapManager(int cacheSize, long timeoutMs, int runsPerTimeout) {
+    this.cacheSize = cacheSize;
+    this.timeoutNs = timeoutMs * 1000000;
+    this.runsPerTimeout = runsPerTimeout;
+  }
+  
+  long getTimeoutMs() {
+    return this.timeoutNs / 1000000;
+  }
+
+  int getRunsPerTimeout() {
+    return this.runsPerTimeout;
+  }
+  
+  public String verifyConfigurationMatches(Configuration conf) {
+    StringBuilder bld = new StringBuilder();
+    int cacheSize = conf.getInt(DFS_CLIENT_MMAP_CACHE_SIZE,
+                    DFS_CLIENT_MMAP_CACHE_SIZE_DEFAULT);
+    if (this.cacheSize != cacheSize) {
+      bld.append("You specified a cache size of ").append(cacheSize).
+          append(", but the existing cache size is ").append(this.cacheSize).
+          append(".  ");
+    }
+    long timeoutMs = conf.getLong(DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS,
+        DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS_DEFAULT);
+    if (getTimeoutMs() != timeoutMs) {
+      bld.append("You specified a cache timeout of ").append(timeoutMs).
+          append(" ms, but the existing cache timeout is ").
+          append(getTimeoutMs()).append("ms").append(".  ");
+    }
+    int runsPerTimeout = conf.getInt(
+        DFS_CLIENT_MMAP_CACHE_THREAD_RUNS_PER_TIMEOUT,
+        DFS_CLIENT_MMAP_CACHE_THREAD_RUNS_PER_TIMEOUT_DEFAULT);
+    if (getRunsPerTimeout() != runsPerTimeout) {
+      bld.append("You specified ").append(runsPerTimeout).
+          append(" runs per timeout, but the existing runs per timeout is ").
+          append(getTimeoutMs()).append(".  ");
+    }
+    return bld.toString();
+  }
+
+  private static class Waitable<T> {
+    private T val;
+    private final Condition cond;
+
+    public Waitable(Condition cond) {
+      this.val = null;
+      this.cond = cond;
+    }
+
+    public T await() throws InterruptedException {
+      while (this.val == null) {
+        this.cond.await();
+      }
+      return this.val;
+    }
+
+    public void provide(T val) {
+      this.val = val;
+      this.cond.signalAll();
+    }
+  }
+
+  private static class Key implements Comparable<Key> {
+    private final ExtendedBlock block;
+    private final DatanodeID datanode;
+    
+    Key(ExtendedBlock block, DatanodeID datanode) {
+      this.block = block;
+      this.datanode = datanode;
+    }
+
+    /**
+     * Compare two ClientMmap regions that we're storing.
+     *
+     * When we append to a block, we bump the genstamp.  It is important to 
+     * compare the genStamp here.  That way, we will not return a shorter 
+     * mmap than required.
+     */
+    @Override
+    public int compareTo(Key o) {
+      return ComparisonChain.start().
+          compare(block.getBlockId(), o.block.getBlockId()).
+          compare(block.getGenerationStamp(), o.block.getGenerationStamp()).
+          compare(block.getBlockPoolId(), o.block.getBlockPoolId()).
+          compare(datanode, o.datanode).
+          result();
+    }
+
+    @Override
+    public boolean equals(Object rhs) {
+      if (rhs == null) {
+        return false;
+      }
+      try {
+        Key o = (Key)rhs;
+        return (compareTo(o) == 0);
+      } catch (ClassCastException e) {
+        return false;
+      }
+    }
+
+    @Override
+    public int hashCode() {
+      return block.hashCode() ^ datanode.hashCode();
+    }
+  }
+
+  /**
+   * Thread which handles expiring mmaps from the cache.
+   */
+  private static class CacheCleaner implements Runnable, Closeable {
+    private WeakReference<ClientMmapManager> managerRef;
+    private ScheduledFuture<?> future;
+    
+    CacheCleaner(ClientMmapManager manager) {
+      this.managerRef= new WeakReference<ClientMmapManager>(manager);
+    }
+
+    @Override
+    public void run() {
+      ClientMmapManager manager = managerRef.get();
+      if (manager == null) return;
+      long curTime = System.nanoTime();
+      try {
+        manager.lock.lock();
+        manager.evictStaleEntries(curTime);
+      } finally {
+        manager.lock.unlock();
+      }
+    }
+    
+    void setFuture(ScheduledFuture<?> future) {
+      this.future = future;
+    }
+
+    @Override
+    public void close() throws IOException {
+      future.cancel(false);
+    }
+  }
+
+  /**
+   * Evict entries which are older than curTime + timeoutNs from the cache.
+   *
+   * NOTE: you must call this function with the lock held.
+   */
+  private void evictStaleEntries(long curTime) {
+    if (closed) {
+      return;
+    }
+    Iterator<Entry<Long, ClientMmap>> iter =
+        evictable.entrySet().iterator(); 
+    while (iter.hasNext()) {
+      Entry<Long, ClientMmap> entry = iter.next();
+      if (entry.getKey() + timeoutNs >= curTime) {
+        return;
+      }
+      ClientMmap mmap = entry.getValue();
+      Key key = new Key(mmap.getBlock(), mmap.getDatanodeID());
+      mmaps.remove(key);
+      iter.remove();
+      mmap.unmap();
+    }
+  }
+
+  /**
+   * Evict one mmap object from the cache.
+   *
+   * NOTE: you must call this function with the lock held.
+   *
+   * @return                  True if an object was evicted; false if none
+   *                          could be evicted.
+   */
+  private boolean evictOne() {
+    Entry<Long, ClientMmap> entry = evictable.pollFirstEntry();
+    if (entry == null) {
+      // We don't want to try creating another mmap region, because the
+      // cache is full.
+      return false;
+    }
+    ClientMmap evictedMmap = entry.getValue(); 
+    Key evictedKey = new Key(evictedMmap.getBlock(), 
+                             evictedMmap.getDatanodeID());
+    mmaps.remove(evictedKey);
+    evictedMmap.unmap();
+    return true;
+  }
+
+  /**
+   * Create a new mmap object.
+   * 
+   * NOTE: you must call this function with the lock held.
+   *
+   * @param key              The key which describes this mmap.
+   * @param in               The input stream to use to create the mmap.
+   * @return                 The new mmap object, or null if there were
+   *                         insufficient resources.
+   * @throws IOException     If there was an I/O error creating the mmap.
+   */
+  private ClientMmap create(Key key, FileInputStream in) throws IOException {
+    if (mmaps.size() + 1 > cacheSize) {
+      if (!evictOne()) {
+        LOG.warn("mmap cache is full (with " + cacheSize + " elements) and " +
+              "nothing is evictable.  Ignoring request for mmap with " +
+              "datanodeID=" + key.datanode + ", " + "block=" + key.block);
+        return null;
+      }
+    }
+    // Create the condition variable that other threads may wait on.
+    Waitable<ClientMmap> waitable =
+        new Waitable<ClientMmap>(lock.newCondition());
+    mmaps.put(key, waitable);
+    // Load the entry
+    boolean success = false;
+    ClientMmap mmap = null;
+    try {
+      try {
+        lock.unlock();
+        mmap = ClientMmap.load(this, in, key.block, key.datanode);
+      } finally {
+        lock.lock();
+      }
+      if (cacheCleaner == null) {
+        cacheCleaner = new CacheCleaner(this);
+        ScheduledFuture<?> future = 
+            executor.scheduleAtFixedRate(cacheCleaner,
+                timeoutNs, timeoutNs / runsPerTimeout, TimeUnit.NANOSECONDS);
+        cacheCleaner.setFuture(future);
+      }
+      success = true;
+    } finally {
+      if (!success) {
+        LOG.warn("failed to create mmap for datanodeID=" + key.datanode +
+                  ", " + "block=" + key.block);
+        mmaps.remove(key);
+      }
+      waitable.provide(mmap);
+    }
+    return mmap;
+  }
+
+  /**
+   * Get or create an mmap region.
+   * 
+   * @param node       The DataNode that owns the block for this mmap region.
+   * @param block      The block ID, block pool ID, and generation stamp of 
+   *                     the block we want to read.
+   * @param in         An open file for this block.  This stream is only used
+   *                     if we have to create a new mmap; if we use an
+   *                     existing one, it is ignored.
+   *
+   * @return           The client mmap region.
+   */
+  public ClientMmap fetch(DatanodeID datanodeID, ExtendedBlock block,
+      FileInputStream in) throws IOException, InterruptedException {
+    LOG.debug("fetching mmap with datanodeID=" + datanodeID + ", " +
+        "block=" + block);
+    Key key = new Key(block, datanodeID);
+    ClientMmap mmap = null;
+    try {
+      lock.lock();
+      if (closed) {
+        throw new IOException("ClientMmapManager is closed.");
+      }
+      while (mmap == null) {
+        Waitable<ClientMmap> entry = mmaps.get(key);
+        if (entry == null) {
+          return create(key, in);
+        }
+        mmap = entry.await();
+      }
+      if (mmap.ref() == 1) {
+        // When going from nobody using the mmap (ref = 0) to somebody
+        // using the mmap (ref = 1), we must make the mmap un-evictable.
+        evictable.remove(mmap.getLastEvictableTimeNs());
+      }
+    }
+    finally {
+      lock.unlock();
+    }
+    LOG.debug("reusing existing mmap with datanodeID=" + datanodeID +
+              ", " + "block=" + block);
+    return mmap;
+  }
+
+  /**
+   * Make an mmap evictable.
+   * 
+   * When an mmap is evictable, it may be removed from the cache if necessary.
+   * mmaps can only be evictable if nobody is using them.
+   *
+   * @param mmap             The mmap to make evictable.
+   */
+  void makeEvictable(ClientMmap mmap) {
+    try {
+      lock.lock();
+      if (closed) {
+        // If this ClientMmapManager is closed, then don't bother with the
+        // cache; just close the mmap.
+        mmap.unmap();
+        return;
+      }
+      long now = System.nanoTime();
+      while (evictable.containsKey(now)) {
+        now++;
+      }
+      mmap.setLastEvictableTimeNs(now);
+      evictable.put(now, mmap);
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    try {
+      lock.lock();
+      closed = true;
+      IOUtils.cleanup(LOG, cacheCleaner);
+
+      // Unmap all the mmaps that nobody is using.
+      // The ones which are in use will be unmapped just as soon as people stop
+      // using them.
+      evictStaleEntries(Long.MAX_VALUE);
+
+      executor.shutdown();
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  @VisibleForTesting
+  public interface ClientMmapVisitor {
+    void accept(ClientMmap mmap);
+  }
+
+  @VisibleForTesting
+  public synchronized void visitMmaps(ClientMmapVisitor visitor)
+      throws InterruptedException {
+    for (Waitable<ClientMmap> entry : mmaps.values()) {
+      visitor.accept(entry.await());
+    }
+  }
+
+  public void visitEvictable(ClientMmapVisitor visitor)
+      throws InterruptedException {
+    for (ClientMmap mmap : evictable.values()) {
+      visitor.accept(mmap);
+    }
+  }
+}

Propchange: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ClientMmapManager.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/expect.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/expect.c?rev=1515906&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/expect.c (added)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/expect.c Tue Aug 20 18:07:47 2013
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "expect.h"
+#include "hdfs.h"
+
+#include <inttypes.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+int expectFileStats(hdfsFile file,
+      uint64_t expectedTotalBytesRead,
+      uint64_t expectedTotalLocalBytesRead,
+      uint64_t expectedTotalShortCircuitBytesRead,
+      uint64_t expectedTotalZeroCopyBytesRead)
+{
+    struct hdfsReadStatistics *stats = NULL;
+    EXPECT_ZERO(hdfsFileGetReadStatistics(file, &stats));
+    if (expectedTotalBytesRead != UINT64_MAX) {
+        EXPECT_INT64_EQ(expectedTotalBytesRead, stats->totalBytesRead);
+    }
+    if (expectedTotalLocalBytesRead != UINT64_MAX) {
+        EXPECT_INT64_EQ(expectedTotalLocalBytesRead,
+                      stats->totalLocalBytesRead);
+    }
+    if (expectedTotalShortCircuitBytesRead != UINT64_MAX) {
+        EXPECT_INT64_EQ(expectedTotalShortCircuitBytesRead,
+                      stats->totalShortCircuitBytesRead);
+    }
+    if (expectedTotalZeroCopyBytesRead != UINT64_MAX) {
+        EXPECT_INT64_EQ(expectedTotalZeroCopyBytesRead,
+                      stats->totalZeroCopyBytesRead);
+    }
+    hdfsFileFreeReadStatistics(stats);
+    return 0;
+}

Propchange: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/expect.c
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/expect.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/expect.h?rev=1515906&r1=1515905&r2=1515906&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/expect.h (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/expect.h Tue Aug 20 18:07:47 2013
@@ -19,16 +19,19 @@
 #ifndef LIBHDFS_NATIVE_TESTS_EXPECT_H
 #define LIBHDFS_NATIVE_TESTS_EXPECT_H
 
+#include <inttypes.h>
 #include <stdio.h>
 
+struct hdfsFile_internal;
+
 #define EXPECT_ZERO(x) \
     do { \
         int __my_ret__ = x; \
         if (__my_ret__) { \
             int __my_errno__ = errno; \
-            fprintf(stderr, "TEST_ERROR: failed on line %d with return " \
+            fprintf(stderr, "TEST_ERROR: failed on %s:%d with return " \
 		    "code %d (errno: %d): got nonzero from %s\n", \
-		    __LINE__, __my_ret__, __my_errno__, #x); \
+		    __FILE__, __LINE__, __my_ret__, __my_errno__, #x); \
             return __my_ret__; \
         } \
     } while (0);
@@ -38,9 +41,9 @@
         void* __my_ret__ = x; \
         int __my_errno__ = errno; \
         if (__my_ret__ != NULL) { \
-            fprintf(stderr, "TEST_ERROR: failed on line %d (errno: %d): " \
+            fprintf(stderr, "TEST_ERROR: failed on %s:%d (errno: %d): " \
 		    "got non-NULL value %p from %s\n", \
-		    __LINE__, __my_errno__, __my_ret__, #x); \
+		    __FILE__, __LINE__, __my_errno__, __my_ret__, #x); \
             return -1; \
         } \
     } while (0);
@@ -50,8 +53,8 @@
         void* __my_ret__ = x; \
         int __my_errno__ = errno; \
         if (__my_ret__ == NULL) { \
-            fprintf(stderr, "TEST_ERROR: failed on line %d (errno: %d): " \
-		    "got NULL from %s\n", __LINE__, __my_errno__, #x); \
+            fprintf(stderr, "TEST_ERROR: failed on %s:%d (errno: %d): " \
+		    "got NULL from %s\n", __FILE__, __LINE__, __my_errno__, #x); \
             return -1; \
         } \
     } while (0);
@@ -61,15 +64,16 @@
         int __my_ret__ = x; \
         int __my_errno__ = errno; \
         if (__my_ret__ != -1) { \
-            fprintf(stderr, "TEST_ERROR: failed on line %d with return " \
-                "code %d (errno: %d): expected -1 from %s\n", __LINE__, \
+            fprintf(stderr, "TEST_ERROR: failed on %s:%d with return " \
+                "code %d (errno: %d): expected -1 from %s\n", \
+                    __FILE__, __LINE__, \
                 __my_ret__, __my_errno__, #x); \
             return -1; \
         } \
         if (__my_errno__ != e) { \
-            fprintf(stderr, "TEST_ERROR: failed on line %d with return " \
+            fprintf(stderr, "TEST_ERROR: failed on %s:%d with return " \
                 "code %d (errno: %d): expected errno = %d from %s\n", \
-                __LINE__, __my_ret__, __my_errno__, e, #x); \
+                __FILE__, __LINE__, __my_ret__, __my_errno__, e, #x); \
             return -1; \
 	} \
     } while (0);
@@ -79,9 +83,9 @@
         int __my_ret__ = x; \
         int __my_errno__ = errno; \
         if (!__my_ret__) { \
-            fprintf(stderr, "TEST_ERROR: failed on line %d with return " \
-		    "code %d (errno: %d): got zero from %s\n", __LINE__, \
-                __my_ret__, __my_errno__, #x); \
+            fprintf(stderr, "TEST_ERROR: failed on %s:%d with return " \
+              "code %d (errno: %d): got zero from %s\n", __FILE__, __LINE__, \
+              __my_ret__, __my_errno__, #x); \
             return -1; \
         } \
     } while (0);
@@ -91,9 +95,9 @@
         int __my_ret__ = x; \
         int __my_errno__ = errno; \
         if (__my_ret__ < 0) { \
-            fprintf(stderr, "TEST_ERROR: failed on line %d with return " \
+            fprintf(stderr, "TEST_ERROR: failed on %s:%d with return " \
                 "code %d (errno: %d): got negative return from %s\n", \
-		    __LINE__, __my_ret__, __my_errno__, #x); \
+                __FILE__, __LINE__, __my_ret__, __my_errno__, #x); \
             return __my_ret__; \
         } \
     } while (0);
@@ -103,9 +107,21 @@
         int __my_ret__ = y; \
         int __my_errno__ = errno; \
         if (__my_ret__ != (x)) { \
-            fprintf(stderr, "TEST_ERROR: failed on line %d with return " \
+            fprintf(stderr, "TEST_ERROR: failed on %s:%d with return " \
               "code %d (errno: %d): expected %d\n", \
-               __LINE__, __my_ret__, __my_errno__, (x)); \
+               __FILE__, __LINE__, __my_ret__, __my_errno__, (x)); \
+            return -1; \
+        } \
+    } while (0);
+
+#define EXPECT_INT64_EQ(x, y) \
+    do { \
+        int64_t __my_ret__ = y; \
+        int __my_errno__ = errno; \
+        if (__my_ret__ != (x)) { \
+            fprintf(stderr, "TEST_ERROR: failed on %s:%d with return " \
+              "value %"PRId64" (errno: %d): expected %"PRId64"\n", \
+               __FILE__, __LINE__, __my_ret__, __my_errno__, (x)); \
             return -1; \
         } \
     } while (0);
@@ -117,4 +133,17 @@
     ret = -errno; \
     } while (ret == -EINTR);
 
+/**
+ * Test that an HDFS file has the given statistics.
+ *
+ * Any parameter can be set to UINT64_MAX to avoid checking it.
+ *
+ * @return 0 on success; error code otherwise
+ */
+int expectFileStats(struct hdfsFile_internal *file,
+      uint64_t expectedTotalBytesRead,
+      uint64_t expectedTotalLocalBytesRead,
+      uint64_t expectedTotalShortCircuitBytesRead,
+      uint64_t expectedTotalZeroCopyBytesRead);
+
 #endif

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.c?rev=1515906&r1=1515905&r2=1515906&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.c (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.c Tue Aug 20 18:07:47 2013
@@ -39,6 +39,7 @@
 #define JAVA_NET_ISA    "java/net/InetSocketAddress"
 #define JAVA_NET_URI    "java/net/URI"
 #define JAVA_STRING     "java/lang/String"
+#define HADOOP_ZERO_COPY_CURSOR "org/apache/hadoop/fs/ZeroCopyCursor"
 
 #define JAVA_VOID       "V"
 
@@ -143,6 +144,15 @@ int hdfsFileGetReadStatistics(hdfsFile f
         goto done;
     }
     s->totalShortCircuitBytesRead = jVal.j;
+    jthr = invokeMethod(env, &jVal, INSTANCE, readStats,
+                  "org/apache/hadoop/hdfs/DFSInputStream$ReadStatistics",
+                  "getTotalZeroCopyBytesRead", "()J");
+    if (jthr) {
+        ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+            "hdfsFileGetReadStatistics: getTotalZeroCopyBytesRead failed");
+        goto done;
+    }
+    s->totalZeroCopyBytesRead = jVal.j;
     *stats = s;
     s = NULL;
     ret = 0;
@@ -183,6 +193,25 @@ void hdfsFileDisableDirectRead(hdfsFile 
     file->flags &= ~HDFS_FILE_SUPPORTS_DIRECT_READ;
 }
 
+int hdfsDisableDomainSocketSecurity(void)
+{
+    jthrowable jthr;
+    JNIEnv* env = getJNIEnv();
+    if (env == NULL) {
+      errno = EINTERNAL;
+      return -1;
+    }
+    jthr = invokeMethod(env, NULL, STATIC, NULL,
+            "org/apache/hadoop/net/unix/DomainSocket",
+            "disableBindPathValidation", "()V");
+    if (jthr) {
+        errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+            "DomainSocket#disableBindPathValidation");
+        return -1;
+    }
+    return 0;
+}
+
 /**
  * hdfsJniEnv: A wrapper struct to be used as 'value'
  * while saving thread -> JNIEnv* mappings
@@ -220,40 +249,6 @@ static jthrowable constructNewObjectOfPa
     return NULL;
 }
 
-/**
- * Set a configuration value.
- *
- * @param env               The JNI environment
- * @param jConfiguration    The configuration object to modify
- * @param key               The key to modify
- * @param value             The value to set the key to
- *
- * @return                  NULL on success; exception otherwise
- */
-static jthrowable hadoopConfSetStr(JNIEnv *env, jobject jConfiguration,
-        const char *key, const char *value)
-{
-    jthrowable jthr;
-    jstring jkey = NULL, jvalue = NULL;
-
-    jthr = newJavaStr(env, key, &jkey);
-    if (jthr)
-        goto done;
-    jthr = newJavaStr(env, value, &jvalue);
-    if (jthr)
-        goto done;
-    jthr = invokeMethod(env, NULL, INSTANCE, jConfiguration,
-            HADOOP_CONF, "set", JMETHOD2(JPARAM(JAVA_STRING),
-                                         JPARAM(JAVA_STRING), JAVA_VOID),
-            jkey, jvalue);
-    if (jthr)
-        goto done;
-done:
-    destroyLocalReference(env, jkey);
-    destroyLocalReference(env, jvalue);
-    return jthr;
-}
-
 static jthrowable hadoopConfGetStr(JNIEnv *env, jobject jConfiguration,
         const char *key, char **val)
 {
@@ -2108,6 +2103,248 @@ int hdfsUtime(hdfsFS fs, const char* pat
     return 0;
 }
 
+struct hadoopZeroCopyCursor* hadoopZeroCopyCursorAlloc(hdfsFile file)
+{
+    int ret;
+    jobject zcursor = NULL;
+    jvalue jVal;
+    jthrowable jthr;
+    JNIEnv* env;
+
+    env = getJNIEnv();
+    if (env == NULL) {
+        errno = EINTERNAL;
+        return NULL;
+    }
+    if (file->type != INPUT) {
+        ret = EINVAL;
+        goto done;
+    }
+    jthr = invokeMethod(env, &jVal, INSTANCE, (jobject)file->file, HADOOP_ISTRM,
+                     "createZeroCopyCursor", "()L"HADOOP_ZERO_COPY_CURSOR";");
+    if (jthr) {
+        ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+            "hadoopZeroCopyCursorAlloc: createZeroCopyCursor");
+        goto done;
+    }
+    zcursor = (*env)->NewGlobalRef(env, jVal.l);
+    if (!zcursor) {
+        ret = printPendingExceptionAndFree(env, PRINT_EXC_ALL,
+            "hadoopZeroCopyCursorAlloc: NewGlobalRef"); 
+    }
+    ret = 0;
+done:
+    if (ret) {
+        errno = ret;
+    }
+    return (struct hadoopZeroCopyCursor*)zcursor;
+}
+
+int hadoopZeroCopyCursorSetFallbackBuffer(struct hadoopZeroCopyCursor* zcursor,
+                                          void *cbuf, uint32_t size)
+{
+    int ret;
+    jobject buffer = NULL;
+    jthrowable jthr;
+    JNIEnv* env;
+
+    env = getJNIEnv();
+    if (env == NULL) {
+        errno = EINTERNAL;
+        return -1;
+    }
+    buffer = (*env)->NewDirectByteBuffer(env, cbuf, size);
+    if (!buffer) {
+        ret = printPendingExceptionAndFree(env, PRINT_EXC_ALL,
+            "hadoopZeroCopyCursorSetFallbackBuffer: NewDirectByteBuffer("
+            "size=%"PRId32"):", size);
+        goto done;
+    }
+    jthr = invokeMethod(env, NULL, INSTANCE, (jobject)zcursor, 
+                    HADOOP_ZERO_COPY_CURSOR, "setFallbackBuffer",
+                    "(Ljava/nio/ByteBuffer;)V", buffer);
+    if (jthr) {
+        ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+                      "hadoopZeroCopyCursorSetFallbackBuffer: "
+                      "FileSystem#setFallbackBuffer");
+        goto done;
+    }
+    ret = 0;
+done:
+    if (ret) {
+        (*env)->DeleteLocalRef(env, buffer);
+        errno = ret;
+        return -1;
+    }
+    return 0;
+}
+
+int hadoopZeroCopyCursorSetSkipChecksums(struct hadoopZeroCopyCursor* zcursor,
+                                         int skipChecksums)
+{
+    JNIEnv* env;
+    jthrowable jthr;
+    jboolean shouldSkipChecksums = skipChecksums ? JNI_TRUE : JNI_FALSE; 
+
+    env = getJNIEnv();
+    if (env == NULL) {
+        errno = EINTERNAL;
+        return -1;
+    }
+    jthr = invokeMethod(env, NULL, INSTANCE, (jobject)zcursor,
+        HADOOP_ZERO_COPY_CURSOR, "setSkipChecksums", "(Z)V",
+        shouldSkipChecksums);
+    if (jthr) {
+        errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+            "hadoopZeroCopyCursorSetSkipChecksums(): setSkipChecksums failed");
+        return -1;
+    }
+    return 0;
+}
+
+int hadoopZeroCopyCursorSetAllowShortReads(
+            struct hadoopZeroCopyCursor* zcursor, int allowShort)
+{
+    JNIEnv* env;
+    jthrowable jthr;
+    jboolean shouldAllowShort = allowShort ? JNI_TRUE : JNI_FALSE;
+
+    env = getJNIEnv();
+    if (env == NULL) {
+        errno = EINTERNAL;
+        return -1;
+    }
+    jthr = invokeMethod(env, NULL, INSTANCE, (jobject)zcursor,
+        HADOOP_ZERO_COPY_CURSOR, "setAllowShortReads", "(Z)V",
+        shouldAllowShort);
+    if (jthr) {
+        errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+            "hadoopZeroCopyCursorSetAllowShortReads(): setAllowShortReads "
+            "failed");
+        return -1;
+    }
+    return 0;
+}
+
+void hadoopZeroCopyCursorFree(struct hadoopZeroCopyCursor *zcursor)
+{
+    JNIEnv* env;
+    jthrowable jthr;
+
+    env = getJNIEnv();
+    if (env == NULL) {
+        return;
+    }
+    jthr = invokeMethod(env, NULL, INSTANCE, (jobject)zcursor,
+                     HADOOP_ZERO_COPY_CURSOR, "close", "()V");
+    if (jthr) {
+        printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+                "hadoopZeroCopyCursorFree(): close failed");
+    }
+    (*env)->DeleteGlobalRef(env, (jobject)zcursor);
+}
+
+/**
+ *  Translate an exception from ZeroCopyCursor#read, translate it into a return
+ *  code.
+ */
+static int translateZCRException(JNIEnv *env, jthrowable exc)
+{
+    int ret;
+    char *className = NULL;
+    jthrowable jthr = classNameOfObject(exc, env, &className);
+
+    if (jthr) {
+        fprintf(stderr, "hadoopZeroCopyRead: unknown "
+                "exception from read().\n");
+        destroyLocalReference(env, jthr);
+        destroyLocalReference(env, jthr);
+        ret = EIO;
+        goto done;
+    } 
+    if (!strcmp(className, "java.io.EOFException")) {
+        ret = 0; // EOF
+        goto done;
+    }
+    if (!strcmp(className, "java.lang.UnsupportedOperationException")) {
+        ret = EPROTONOSUPPORT;
+        goto done;
+    }
+    ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+            "hadoopZeroCopyRead: ZeroCopyCursor#read failed");
+done:
+    free(className);
+    return ret;
+}
+
+int32_t hadoopZeroCopyRead(struct hadoopZeroCopyCursor *zcursor,
+                           int32_t toRead, const void **data)
+{
+    int32_t ret, nRead = -1;
+    JNIEnv* env;
+    jthrowable jthr;
+    jobject byteBuffer = NULL;
+    uint8_t *addr;
+    jint position;
+    jvalue jVal;
+    
+    env = getJNIEnv();
+    if (env == NULL) {
+        errno = EINTERNAL;
+        return -1;
+    }
+    jthr = invokeMethod(env, NULL, INSTANCE, (jobject)zcursor,
+                     HADOOP_ZERO_COPY_CURSOR, "read", "(I)V", toRead);
+    if (jthr) {
+        ret = translateZCRException(env, jthr);
+        goto done;
+    }
+    jthr = invokeMethod(env, &jVal, INSTANCE, (jobject)zcursor,
+                     HADOOP_ZERO_COPY_CURSOR, "getData",
+                     "()Ljava/nio/ByteBuffer;");
+    if (jthr) {
+        ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+                "hadoopZeroCopyRead(toRead=%"PRId32"): getData failed",
+                toRead);
+        goto done;
+    }
+    byteBuffer = jVal.l;
+    addr = (*env)->GetDirectBufferAddress(env, byteBuffer);
+    if (!addr) {
+        fprintf(stderr, "hadoopZeroCopyRead(toRead=%"PRId32"): "
+                    "failed to get direct buffer address.\n", toRead);
+        ret = EIO;
+        goto done;
+    }
+    jthr = invokeMethod(env, &jVal, INSTANCE, byteBuffer,
+                     "java/nio/ByteBuffer", "position", "()I");
+    if (jthr) {
+        ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+                "hadoopZeroCopyRead(toRead=%"PRId32"): ByteBuffer#position "
+                "failed", toRead);
+        goto done;
+    }
+    position = jVal.i;
+    jthr = invokeMethod(env, &jVal, INSTANCE, byteBuffer,
+                     "java/nio/ByteBuffer", "remaining", "()I");
+    if (jthr) {
+        ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+                "hadoopZeroCopyRead(toRead=%"PRId32"): ByteBuffer#remaining "
+                "failed", toRead);
+        goto done;
+    }
+    ret = 0;
+    nRead = jVal.i;
+    *data = addr + position;
+done:
+    (*env)->DeleteLocalRef(env, byteBuffer);
+    if (nRead == -1) {
+        errno = ret;
+        return -1;
+    }
+    return nRead;
+}
+
 char***
 hdfsGetHosts(hdfsFS fs, const char* path, tOffset start, tOffset length)
 {

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.h?rev=1515906&r1=1515905&r2=1515906&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.h (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.h Tue Aug 20 18:07:47 2013
@@ -85,6 +85,7 @@ extern  "C" {
       uint64_t totalBytesRead;
       uint64_t totalLocalBytesRead;
       uint64_t totalShortCircuitBytesRead;
+      uint64_t totalZeroCopyBytesRead;
     };
 
     /**
@@ -680,7 +681,89 @@ extern  "C" {
      * @return 0 on success else -1
      */
     int hdfsUtime(hdfsFS fs, const char* path, tTime mtime, tTime atime);
-    
+
+    /**
+     * Create a zero-copy cursor object.
+     *
+     * @param file        The file to use for zero-copy reads.
+     *
+     * @return            The zero-copy cursor, or NULL + errno on failure.
+     */
+    struct hadoopZeroCopyCursor* hadoopZeroCopyCursorAlloc(hdfsFile file);
+
+    /**
+     * Set the fallback buffer which will be used by the zero copy object.
+     *
+     * You are responsible for ensuring that this buffer stays valid until you
+     * either set a different buffer by calling this function again, or free the
+     * zero-copy cursor.
+     *
+     * @param zcursor     The zero-copy cursor.
+     * @param cbuf        The buffer to use.
+     * @param size        Size of the buffer.
+     *
+     * @return            0 on success.  -1 on error.  Errno will be set on
+     *                    error. 
+     */
+    int hadoopZeroCopyCursorSetFallbackBuffer(
+              struct hadoopZeroCopyCursor* zcursor, void *cbuf, uint32_t size);
+
+    /**
+     * Set whether our cursor should skip checksums or not.
+     *
+     * @param zcursor        The cursor
+     * @param skipChecksums  Nonzero to skip checksums.
+     *
+     * @return               -1 on error, 0 otherwise.
+     */
+    int hadoopZeroCopyCursorSetSkipChecksums(
+            struct hadoopZeroCopyCursor* zcursor, int skipChecksums);
+
+    /**
+     * Set whether our cursor should allow short reads to occur.
+     * Short reads will always occur if there is not enough data to read
+     * (i.e., at EOF), but normally we don't return them when reading other
+     * parts of the file.
+     *
+     * @param zcursor        The cursor
+     * @param skipChecksums  Nonzero to skip checksums.
+     *
+     * @return               -1 on error, 0 otherwise.
+     */
+    int hadoopZeroCopyCursorSetAllowShortReads(
+                struct hadoopZeroCopyCursor* zcursor, int allowShort);
+
+    /**
+     * Free zero-copy cursor.
+     *
+     * This will dispose of the cursor allocated by hadoopZeroCopyCursorAlloc, as
+     * well as any memory map that we have created.  You must be done with the
+     * data returned from hadoopZeroCopyRead before calling this.
+     *
+     * @param zcursor     The zero-copy cursor.
+     */
+    void hadoopZeroCopyCursorFree(struct hadoopZeroCopyCursor *zcursor);
+
+    /*
+     * Perform a zero-copy read.
+     *
+     * @param zcursor     The zero-copy cursor object.
+     * @param toRead      The maximum amount to read.
+     * @param data        (out param) on succesful return, a pointer to the
+     *                    data.  This pointer will remain valid until the next
+     *                    call to hadoopZeroCopyRead, or until
+     *                    hadoopZeroCopyCursorFree is called on zcursor.
+     *
+     * @return            -2 if zero-copy is unavailable, and 
+     *                    -1 if there was an error.  errno will be the error.
+     *                    0 if we hit end-of-file without reading anything.
+     *                    The positive number of bytes read otherwise.  Short
+     *                        reads will happen only if EOF is reached.
+     *                    The amount read otherwise.
+     */
+    int32_t hadoopZeroCopyRead(struct hadoopZeroCopyCursor *zcursor,
+                             int32_t toRead, const void **data);
+
 #ifdef __cplusplus
 }
 #endif

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs_test.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs_test.h?rev=1515906&r1=1515905&r2=1515906&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs_test.h (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs_test.h Tue Aug 20 18:07:47 2013
@@ -48,6 +48,15 @@ extern  "C" {
      * @param file     The HDFS file
      */
     void hdfsFileDisableDirectRead(struct hdfsFile_internal *file);
+
+    /**
+     * Disable domain socket security checks.
+     *
+     * @param          0 if domain socket security was disabled;
+     *                 -1 if not.
+     */
+    int hdfsDisableDomainSocketSecurity(void); 
+
 #ifdef __cplusplus
 }
 #endif

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/jni_helper.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/jni_helper.c?rev=1515906&r1=1515905&r2=1515906&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/jni_helper.c (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/jni_helper.c Tue Aug 20 18:07:47 2013
@@ -608,3 +608,42 @@ JNIEnv* getJNIEnv(void)
     return env;
 }
 
+int javaObjectIsOfClass(JNIEnv *env, jobject obj, const char *name)
+{
+    jclass clazz;
+    int ret;
+
+    clazz = (*env)->FindClass(env, name);
+    if (!clazz) {
+        printPendingExceptionAndFree(env, PRINT_EXC_ALL,
+            "javaObjectIsOfClass(%s)", name);
+        return -1;
+    }
+    ret = (*env)->IsInstanceOf(env, obj, clazz);
+    (*env)->DeleteLocalRef(env, clazz);
+    return ret == JNI_TRUE ? 1 : 0;
+}
+
+jthrowable hadoopConfSetStr(JNIEnv *env, jobject jConfiguration,
+        const char *key, const char *value)
+{
+    jthrowable jthr;
+    jstring jkey = NULL, jvalue = NULL;
+
+    jthr = newJavaStr(env, key, &jkey);
+    if (jthr)
+        goto done;
+    jthr = newJavaStr(env, value, &jvalue);
+    if (jthr)
+        goto done;
+    jthr = invokeMethod(env, NULL, INSTANCE, jConfiguration,
+            "org/apache/hadoop/conf/Configuration", "set", 
+            "(Ljava/lang/String;Ljava/lang/String;)V",
+            jkey, jvalue);
+    if (jthr)
+        goto done;
+done:
+    (*env)->DeleteLocalRef(env, jkey);
+    (*env)->DeleteLocalRef(env, jvalue);
+    return jthr;
+}

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/jni_helper.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/jni_helper.h?rev=1515906&r1=1515905&r2=1515906&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/jni_helper.h (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/jni_helper.h Tue Aug 20 18:07:47 2013
@@ -114,6 +114,32 @@ jthrowable classNameOfObject(jobject job
  * */
 JNIEnv* getJNIEnv(void);
 
+/**
+ * Figure out if a Java object is an instance of a particular class.
+ *
+ * @param env  The Java environment.
+ * @param obj  The object to check.
+ * @param name The class name to check.
+ *
+ * @return     -1 if we failed to find the referenced class name.
+ *             0 if the object is not of the given class.
+ *             1 if the object is of the given class.
+ */
+int javaObjectIsOfClass(JNIEnv *env, jobject obj, const char *name);
+
+/**
+ * Set a value in a configuration object.
+ *
+ * @param env               The JNI environment
+ * @param jConfiguration    The configuration object to modify
+ * @param key               The key to modify
+ * @param value             The value to set the key to
+ *
+ * @return                  NULL on success; exception otherwise
+ */
+jthrowable hadoopConfSetStr(JNIEnv *env, jobject jConfiguration,
+        const char *key, const char *value);
+
 #endif /*LIBHDFS_JNI_HELPER_H*/
 
 /**

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/native_mini_dfs.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/native_mini_dfs.c?rev=1515906&r1=1515905&r2=1515906&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/native_mini_dfs.c (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/native_mini_dfs.c Tue Aug 20 18:07:47 2013
@@ -17,14 +17,19 @@
  */
 
 #include "exception.h"
+#include "hdfs.h"
+#include "hdfs_test.h"
 #include "jni_helper.h"
 #include "native_mini_dfs.h"
 
 #include <errno.h>
 #include <jni.h>
+#include <limits.h>
 #include <stdio.h>
 #include <stdlib.h>
 #include <string.h>
+#include <sys/types.h>
+#include <unistd.h>
 
 #define MINIDFS_CLUSTER_BUILDER "org/apache/hadoop/hdfs/MiniDFSCluster$Builder"
 #define MINIDFS_CLUSTER "org/apache/hadoop/hdfs/MiniDFSCluster"
@@ -39,8 +44,44 @@ struct NativeMiniDfsCluster {
      * The NativeMiniDfsCluster object
      */
     jobject obj;
+
+    /**
+     * Path to the domain socket, or the empty string if there is none.
+     */
+    char domainSocketPath[PATH_MAX];
 };
 
+static jthrowable nmdConfigureShortCircuit(JNIEnv *env,
+              struct NativeMiniDfsCluster *cl, jobject cobj)
+{
+    jthrowable jthr;
+    char *tmpDir;
+
+    int ret = hdfsDisableDomainSocketSecurity();
+    if (ret) {
+        return newRuntimeError(env, "failed to disable hdfs domain "
+                               "socket security: error %d", ret);
+    }
+    jthr = hadoopConfSetStr(env, cobj, "dfs.client.read.shortcircuit", "true");
+    if (jthr) {
+        return jthr;
+    }
+    tmpDir = getenv("TMPDIR");
+    if (!tmpDir) {
+        tmpDir = "/tmp";
+    }
+    snprintf(cl->domainSocketPath, PATH_MAX, "%s/native_mini_dfs.sock.%d.%d",
+             tmpDir, getpid(), rand());
+    snprintf(cl->domainSocketPath, PATH_MAX, "%s/native_mini_dfs.sock.%d.%d",
+             tmpDir, getpid(), rand());
+    jthr = hadoopConfSetStr(env, cobj, "dfs.domain.socket.path",
+                            cl->domainSocketPath);
+    if (jthr) {
+        return jthr;
+    }
+    return NULL;
+}
+
 struct NativeMiniDfsCluster* nmdCreate(struct NativeMiniDfsConf *conf)
 {
     struct NativeMiniDfsCluster* cl = NULL;
@@ -81,6 +122,28 @@ struct NativeMiniDfsCluster* nmdCreate(s
             goto error;
         }
     }
+    if (jthr) {
+        printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+                              "nmdCreate: Configuration::setBoolean");
+        goto error;
+    }
+    // Disable 'minimum block size' -- it's annoying in tests.
+    (*env)->DeleteLocalRef(env, jconfStr);
+    jconfStr = NULL;
+    jthr = newJavaStr(env, "dfs.namenode.fs-limits.min-block-size", &jconfStr);
+    if (jthr) {
+        printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+                              "nmdCreate: new String");
+        goto error;
+    }
+    jthr = invokeMethod(env, NULL, INSTANCE, cobj, HADOOP_CONF,
+                        "setLong", "(Ljava/lang/String;J)V", jconfStr, 0LL);
+    if (jthr) {
+        printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+                              "nmdCreate: Configuration::setLong");
+        goto error;
+    }
+    // Creae MiniDFSCluster object
     jthr = constructNewObjectOfClass(env, &bld, MINIDFS_CLUSTER_BUILDER,
                     "(L"HADOOP_CONF";)V", cobj);
     if (jthr) {
@@ -88,6 +151,14 @@ struct NativeMiniDfsCluster* nmdCreate(s
             "nmdCreate: NativeMiniDfsCluster#Builder#Builder");
         goto error;
     }
+    if (conf->configureShortCircuit) {
+        jthr = nmdConfigureShortCircuit(env, cl, cobj);
+        if (jthr) {
+            printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+                "nmdCreate: nmdConfigureShortCircuit error");
+            goto error;
+        }
+    }
     jthr = invokeMethod(env, &val, INSTANCE, bld, MINIDFS_CLUSTER_BUILDER,
             "format", "(Z)L" MINIDFS_CLUSTER_BUILDER ";", conf->doFormat);
     if (jthr) {
@@ -272,3 +343,29 @@ error_dlr_nn:
     
     return ret;
 }
+
+int nmdConfigureHdfsBuilder(struct NativeMiniDfsCluster *cl,
+                            struct hdfsBuilder *bld)
+{
+    int port, ret;
+
+    hdfsBuilderSetNameNode(bld, "localhost");
+    port = nmdGetNameNodePort(cl);
+    if (port < 0) {
+      fprintf(stderr, "nmdGetNameNodePort failed with error %d\n", -port);
+      return EIO;
+    }
+    hdfsBuilderSetNameNodePort(bld, port);
+    if (cl->domainSocketPath[0]) {
+      ret = hdfsBuilderConfSetStr(bld, "dfs.client.read.shortcircuit", "true");
+      if (ret) {
+          return ret;
+      }
+      ret = hdfsBuilderConfSetStr(bld, "dfs.domain.socket.path",
+                            cl->domainSocketPath);
+      if (ret) {
+          return ret;
+      }
+    }
+    return 0;
+}