You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by ar...@apache.org on 2013/09/30 20:28:11 UTC

svn commit: r1527697 [1/2] - in /hadoop/common/branches/HDFS-2832/hadoop-hdfs-project: hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/ hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/ hadoop-hdfs/ hadoop-hdfs/dev-support/ h...

Author: arp
Date: Mon Sep 30 18:28:07 2013
New Revision: 1527697

URL: http://svn.apache.org/r1527697
Log:
Merging r1526971 through r1527683 from trunk to branch HDFS-2832

Added:
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ClientMmap.java
      - copied unchanged from r1527683, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ClientMmap.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ClientMmapManager.java
      - copied unchanged from r1527683, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ClientMmapManager.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/expect.c
      - copied unchanged from r1527683, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/expect.c
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/test/test_libhdfs_zerocopy.c
      - copied unchanged from r1527683, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/test/test_libhdfs_zerocopy.c
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java
      - copied unchanged from r1527683, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestNameNodeHttpServer.java
      - copied unchanged from r1527683, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestNameNodeHttpServer.java
Removed:
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/LruCache.java
Modified:
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/   (props changed)
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/DFSClientCache.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestDFSClientCache.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/CMakeLists.txt
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/   (props changed)
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeHttpServer.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/native/   (props changed)
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/expect.h
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.c
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.h
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs_test.h
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/jni_helper.c
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/jni_helper.h
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/native_mini_dfs.c
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/native_mini_dfs.h
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/datanode/   (props changed)
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/   (props changed)
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/secondary/   (props changed)
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/hdfs/   (props changed)
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java

Propchange: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs:r1526971-1527683

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/DFSClientCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/DFSClientCache.java?rev=1527697&r1=1527696&r2=1527697&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/DFSClientCache.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/DFSClientCache.java Mon Sep 30 18:28:07 2013
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.nfs.nfs3;
 
 import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
+import java.util.concurrent.ExecutionException;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -27,59 +28,81 @@ import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.security.UserGroupInformation;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+
 /**
  * A cache saves DFSClient objects for different users
  */
-public class DFSClientCache {
-  static final Log LOG = LogFactory.getLog(DFSClientCache.class);
-  private final LruCache<String, DFSClient> lruTable;
+class DFSClientCache {
+  private static final Log LOG = LogFactory.getLog(DFSClientCache.class);
+  /**
+   * Cache that maps User id to corresponding DFSClient.
+   */
+  @VisibleForTesting
+  final LoadingCache<String, DFSClient> clientCache;
+
+  final static int DEFAULT_DFS_CLIENT_CACHE_SIZE = 256;
+
   private final Configuration config;
 
-  public DFSClientCache(Configuration config) {
-    // By default, keep 256 DFSClient instance for 256 active users
-    this(config, 256);
+  DFSClientCache(Configuration config) {
+    this(config, DEFAULT_DFS_CLIENT_CACHE_SIZE);
   }
 
-  public DFSClientCache(Configuration config, int size) {
-    lruTable = new LruCache<String, DFSClient>(size);
+  DFSClientCache(Configuration config, int clientCache) {
     this.config = config;
+    this.clientCache = CacheBuilder.newBuilder()
+        .maximumSize(clientCache)
+        .removalListener(clientRemovealListener())
+        .build(clientLoader());
+  }
+
+  private CacheLoader<String, DFSClient> clientLoader() {
+    return new CacheLoader<String, DFSClient>() {
+      @Override
+      public DFSClient load(String userName) throws Exception {
+        UserGroupInformation ugi = UserGroupInformation
+            .createRemoteUser(userName);
+
+        // Guava requires CacheLoader never returns null.
+        return ugi.doAs(new PrivilegedExceptionAction<DFSClient>() {
+          public DFSClient run() throws IOException {
+            return new DFSClient(NameNode.getAddress(config), config);
+          }
+        });
+      }
+    };
+  }
+
+  private RemovalListener<String, DFSClient> clientRemovealListener() {
+    return new RemovalListener<String, DFSClient>() {
+      @Override
+      public void onRemoval(RemovalNotification<String, DFSClient> notification) {
+        DFSClient client = notification.getValue();
+        try {
+          client.close();
+        } catch (IOException e) {
+          LOG.warn(String.format(
+              "IOException when closing the DFSClient(%s), cause: %s", client,
+              e));
+        }
+      }
+    };
   }
 
-  public void put(String uname, DFSClient client) {
-    lruTable.put(uname, client);
-  }
-
-  synchronized public DFSClient get(String uname) {
-    DFSClient client = lruTable.get(uname);
-    if (client != null) {
-      return client;
-    }
-
-    // Not in table, create one.
+  DFSClient get(String userName) {
+    DFSClient client = null;
     try {
-      UserGroupInformation ugi = UserGroupInformation.createRemoteUser(uname);
-      client = ugi.doAs(new PrivilegedExceptionAction<DFSClient>() {
-        public DFSClient run() throws IOException {
-          return new DFSClient(NameNode.getAddress(config), config);
-        }
-      });
-    } catch (IOException e) {
-      LOG.error("Create DFSClient failed for user:" + uname);
-      e.printStackTrace();
-
-    } catch (InterruptedException e) {
-      e.printStackTrace();
+      client = clientCache.get(userName);
+    } catch (ExecutionException e) {
+      LOG.error("Failed to create DFSClient for user:" + userName + " Cause:"
+          + e);
     }
-    // Add new entry
-    lruTable.put(uname, client);
     return client;
   }
-
-  public int usedSize() {
-    return lruTable.usedSize();
-  }
-
-  public boolean containsKey(String key) {
-    return lruTable.containsKey(key);
-  }
 }

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java?rev=1527697&r1=1527696&r2=1527697&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java Mon Sep 30 18:28:07 2013
@@ -422,7 +422,7 @@ class OpenFileCtx {
       LOG.warn("Haven't noticed any partial overwrite for a sequential file"
           + " write requests. Treat it as a real random write, no support.");
       response = new WRITE3Response(Nfs3Status.NFS3ERR_INVAL, wccData, 0,
-          WriteStableHow.UNSTABLE, 0);
+          WriteStableHow.UNSTABLE, Nfs3Constant.WRITE_COMMIT_VERF);
     } else {
       if (LOG.isDebugEnabled()) {
         LOG.debug("Process perfectOverWrite");
@@ -559,7 +559,7 @@ class OpenFileCtx {
     if (comparator.compare(readbuffer, 0, readCount, data, 0, count) != 0) {
       LOG.info("Perfect overwrite has different content");
       response = new WRITE3Response(Nfs3Status.NFS3ERR_INVAL, wccData, 0,
-          stableHow, 0);
+          stableHow, Nfs3Constant.WRITE_COMMIT_VERF);
     } else {
       LOG.info("Perfect overwrite has same content,"
           + " updating the mtime, then return success");
@@ -571,12 +571,12 @@ class OpenFileCtx {
         LOG.info("Got error when processing perfect overwrite, path=" + path
             + " error:" + e);
         return new WRITE3Response(Nfs3Status.NFS3ERR_IO, wccData, 0, stableHow,
-            0);
+            Nfs3Constant.WRITE_COMMIT_VERF);
       }
 
       wccData.setPostOpAttr(postOpAttr);
       response = new WRITE3Response(Nfs3Status.NFS3_OK, wccData, count,
-          stableHow, 0);
+          stableHow, Nfs3Constant.WRITE_COMMIT_VERF);
     }
     return response;
   }

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestDFSClientCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestDFSClientCache.java?rev=1527697&r1=1527696&r2=1527697&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestDFSClientCache.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestDFSClientCache.java Mon Sep 30 18:28:07 2013
@@ -17,41 +17,44 @@
  */
 package org.apache.hadoop.hdfs.nfs.nfs3;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hdfs.DFSClient;
 import org.junit.Test;
-import org.mockito.Mockito;
 
 public class TestDFSClientCache {
   @Test
-  public void testLruTable() throws IOException {
-    DFSClientCache cache = new DFSClientCache(new Configuration(), 3);
-    DFSClient client = Mockito.mock(DFSClient.class);
-    cache.put("a", client);
-    assertTrue(cache.containsKey("a"));
-
-    cache.put("b", client);
-    cache.put("c", client);
-    cache.put("d", client);
-    assertTrue(cache.usedSize() == 3);
-    assertFalse(cache.containsKey("a"));
-
-    // Cache should have d,c,b in LRU order
-    assertTrue(cache.containsKey("b"));
-    // Do a lookup to make b the most recently used
-    assertTrue(cache.get("b") != null);
-
-    cache.put("e", client);
-    assertTrue(cache.usedSize() == 3);
-    // c should be replaced with e, and cache has e,b,d
-    assertFalse(cache.containsKey("c"));
-    assertTrue(cache.containsKey("e"));
-    assertTrue(cache.containsKey("b"));
-    assertTrue(cache.containsKey("d"));
+  public void testEviction() throws IOException {
+    Configuration conf = new Configuration();
+    conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "hdfs://localhost");
+
+    // Only one entry will be in the cache
+    final int MAX_CACHE_SIZE = 2;
+
+    DFSClientCache cache = new DFSClientCache(conf, MAX_CACHE_SIZE);
+
+    DFSClient c1 = cache.get("test1");
+    assertTrue(cache.get("test1").toString().contains("ugi=test1"));
+    assertEquals(c1, cache.get("test1"));
+    assertFalse(isDfsClientClose(c1));
+
+    cache.get("test2");
+    assertTrue(isDfsClientClose(c1));
+    assertEquals(MAX_CACHE_SIZE - 1, cache.clientCache.size());
+  }
+
+  private static boolean isDfsClientClose(DFSClient c) {
+    try {
+      c.exists("");
+    } catch (IOException e) {
+      return e.getMessage().equals("Filesystem closed");
+    }
+    return false;
   }
 }

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1527697&r1=1527696&r2=1527697&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Mon Sep 30 18:28:07 2013
@@ -250,6 +250,9 @@ Release 2.3.0 - UNRELEASED
     HDFS-5122. Support failover and retry in WebHdfsFileSystem for NN HA.
     (Haohui Mai via jing9)
 
+    HDFS-4953. Enable HDFS local reads via mmap.
+    (Colin Patrick McCabe via wang).
+
   IMPROVEMENTS
 
     HDFS-4657.  Limit the number of blocks logged by the NN after a block
@@ -291,6 +294,12 @@ Release 2.3.0 - UNRELEASED
 
     HDFS-5240. Separate formatting from logging in the audit logger API (daryn)
 
+    HDFS-5191. Revisit zero-copy API in FSDataInputStream to make it more
+    intuitive.  (Contributed by Colin Patrick McCabe)
+
+    HDFS-5260. Merge zero-copy memory-mapped HDFS client reads to trunk and
+    branch-2. (cnauroth)
+
   OPTIMIZATIONS
 
     HDFS-5239.  Allow FSNamesystem lock fairness to be configurable (daryn)
@@ -316,6 +325,8 @@ Release 2.3.0 - UNRELEASED
     HDFS-5031. BlockScanner scans the block multiple times. (Vinay via Arpit
     Agarwal)
 
+    HDFS-5266. ElasticByteBufferPool#Key does not implement equals. (cnauroth)
+
 Release 2.2.0 - UNRELEASED
 
   INCOMPATIBLE CHANGES
@@ -336,11 +347,14 @@ Release 2.1.2 - UNRELEASED
 
   IMPROVEMENTS
 
-  OPTIMIZATIONS
-
     HDFS-5246. Make Hadoop nfs server port and mount daemon port
     configurable. (Jinghui Wang via brandonli)
 
+    HDFS-5256. Use guava LoadingCache to implement DFSClientCache. (Haohui Mai
+    via brandonli)
+
+  OPTIMIZATIONS
+
   BUG FIXES
 
     HDFS-5139. Remove redundant -R option from setrep.
@@ -354,6 +368,11 @@ Release 2.1.2 - UNRELEASED
     HDFS-5186. TestFileJournalManager fails on Windows due to file handle leaks.
     (Chuan Liu via cnauroth)
 
+    HDFS-5268. NFS write commit verifier is not set in a few places (brandonli)
+
+    HDFS-5265. Namenode fails to start when dfs.https.port is unspecified.
+    (Haohui Mai via jing9)
+
 Release 2.1.1-beta - 2013-09-23
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml?rev=1527697&r1=1527696&r2=1527697&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml Mon Sep 30 18:28:07 2013
@@ -330,4 +330,14 @@
        <Method name="setDirInternal" />
        <Bug pattern="DM_STRING_CTOR" />
      </Match>
+    <Match>
+      <Class name="org.apache.hadoop.hdfs.client.ClientMmapManager" />
+      <Method name="create" />
+      <Bug pattern="UL_UNRELEASED_LOCK_EXCEPTION_PATH" />
+    </Match>
+    <Match>
+      <Class name="org.apache.hadoop.hdfs.client.ClientMmapManager" />
+      <Method name="create" />
+      <Bug pattern="UL_UNRELEASED_LOCK" />
+    </Match>
  </FindBugsFilter>

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/CMakeLists.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/CMakeLists.txt?rev=1527697&r1=1527696&r2=1527697&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/CMakeLists.txt (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/CMakeLists.txt Mon Sep 30 18:28:07 2013
@@ -142,6 +142,7 @@ target_link_libraries(test_native_mini_d
 )
 
 add_executable(test_libhdfs_threaded
+    main/native/libhdfs/expect.c
     main/native/libhdfs/test_libhdfs_threaded.c
 )
 target_link_libraries(test_libhdfs_threaded
@@ -150,6 +151,16 @@ target_link_libraries(test_libhdfs_threa
     pthread
 )
 
+add_executable(test_libhdfs_zerocopy
+    main/native/libhdfs/expect.c
+    main/native/libhdfs/test/test_libhdfs_zerocopy.c
+)
+target_link_libraries(test_libhdfs_zerocopy
+    hdfs
+    native_mini_dfs
+    pthread
+)
+
 IF(REQUIRE_LIBWEBHDFS)
     add_subdirectory(contrib/libwebhdfs)
 ENDIF(REQUIRE_LIBWEBHDFS)

Propchange: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java:r1526971-1527683

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java?rev=1527697&r1=1527696&r2=1527697&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java Mon Sep 30 18:28:07 2013
@@ -20,12 +20,16 @@ package org.apache.hadoop.hdfs;
 import java.io.IOException;
 
 import org.apache.hadoop.fs.ByteBufferReadable;
+import org.apache.hadoop.hdfs.client.ClientMmap;
+import org.apache.hadoop.hdfs.client.ClientMmapManager;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 
 /**
  * A BlockReader is responsible for reading a single block
  * from a single datanode.
  */
 public interface BlockReader extends ByteBufferReadable {
+  
 
   /* same interface as inputStream java.io.InputStream#read()
    * used by DFSInputStream#read()
@@ -81,4 +85,14 @@ public interface BlockReader extends Byt
    *                      All short-circuit reads are also local.
    */
   boolean isShortCircuit();
+
+  /**
+   * Get a ClientMmap object for this BlockReader.
+   *
+   * @param curBlock      The current block.
+   * @return              The ClientMmap object, or null if mmap is not
+   *                      supported.
+   */
+  ClientMmap getClientMmap(LocatedBlock curBlock,
+        ClientMmapManager mmapManager);
 }

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java?rev=1527697&r1=1527696&r2=1527697&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java Mon Sep 30 18:28:07 2013
@@ -22,11 +22,15 @@ import java.io.DataInputStream;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import org.apache.hadoop.conf.Configuration;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hdfs.client.ClientMmap;
+import org.apache.hadoop.hdfs.client.ClientMmapManager;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
 import org.apache.hadoop.hdfs.util.DirectBufferPool;
 import org.apache.hadoop.io.IOUtils;
@@ -87,6 +91,8 @@ class BlockReaderLocal implements BlockR
   private final ExtendedBlock block;
   
   private final FileInputStreamCache fisCache;
+  private ClientMmap clientMmap;
+  private boolean mmapDisabled;
   
   private static int getSlowReadBufferNumChunks(int bufSize,
       int bytesPerChecksum) {
@@ -113,6 +119,8 @@ class BlockReaderLocal implements BlockR
     this.datanodeID = datanodeID;
     this.block = block;
     this.fisCache = fisCache;
+    this.clientMmap = null;
+    this.mmapDisabled = false;
 
     // read and handle the common header here. For now just a version
     checksumIn.getChannel().position(0);
@@ -487,6 +495,10 @@ class BlockReaderLocal implements BlockR
 
   @Override
   public synchronized void close() throws IOException {
+    if (clientMmap != null) {
+      clientMmap.unref();
+      clientMmap = null;
+    }
     if (fisCache != null) {
       if (LOG.isDebugEnabled()) {
         LOG.debug("putting FileInputStream for " + filename +
@@ -534,4 +546,30 @@ class BlockReaderLocal implements BlockR
   public boolean isShortCircuit() {
     return true;
   }
+
+  @Override
+  public ClientMmap getClientMmap(LocatedBlock curBlock,
+      ClientMmapManager mmapManager) {
+    if (clientMmap == null) {
+      if (mmapDisabled) {
+        return null;
+      }
+      try {
+        clientMmap = mmapManager.fetch(datanodeID, block, dataIn);
+        if (clientMmap == null) {
+          mmapDisabled = true;
+          return null;
+        }
+      } catch (InterruptedException e) {
+        LOG.error("Interrupted while setting up mmap for " + filename, e);
+        Thread.currentThread().interrupt();
+        return null;
+      } catch (IOException e) {
+        LOG.error("unable to set up mmap for " + filename, e);
+        mmapDisabled = true;
+        return null;
+      }
+    }
+    return clientMmap;
+  }
 }

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java?rev=1527697&r1=1527696&r2=1527697&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java Mon Sep 30 18:28:07 2013
@@ -28,6 +28,8 @@ import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.Map;
 
+import org.apache.hadoop.hdfs.client.ClientMmap;
+import org.apache.hadoop.hdfs.client.ClientMmapManager;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -35,6 +37,7 @@ import org.apache.hadoop.hdfs.protocol.B
 import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
 import org.apache.hadoop.hdfs.util.DirectBufferPool;
@@ -701,4 +704,10 @@ class BlockReaderLocalLegacy implements 
   public boolean isShortCircuit() {
     return true;
   }
+
+  @Override
+  public ClientMmap getClientMmap(LocatedBlock curBlock,
+      ClientMmapManager mmapManager) {
+    return null;
+  }
 }

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1527697&r1=1527696&r2=1527697&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java Mon Sep 30 18:28:07 2013
@@ -104,6 +104,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.UnresolvedLinkException;
 import org.apache.hadoop.fs.VolumeId;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.client.ClientMmapManager;
 import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
 import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
@@ -206,7 +207,43 @@ public class DFSClient implements java.i
   private boolean shouldUseLegacyBlockReaderLocal;
   private final CachingStrategy defaultReadCachingStrategy;
   private final CachingStrategy defaultWriteCachingStrategy;
+  private ClientMmapManager mmapManager;
   
+  private static final ClientMmapManagerFactory MMAP_MANAGER_FACTORY =
+      new ClientMmapManagerFactory();
+
+  private static final class ClientMmapManagerFactory {
+    private ClientMmapManager mmapManager = null;
+    /**
+     * Tracks the number of users of mmapManager.
+     */
+    private int refcnt = 0;
+
+    synchronized ClientMmapManager get(Configuration conf) {
+      if (refcnt++ == 0) {
+        mmapManager = ClientMmapManager.fromConf(conf);
+      } else {
+        String mismatches = mmapManager.verifyConfigurationMatches(conf);
+        if (!mismatches.isEmpty()) {
+          LOG.warn("The ClientMmapManager settings you specified " +
+            "have been ignored because another thread created the " +
+            "ClientMmapManager first.  " + mismatches);
+        }
+      }
+      return mmapManager;
+    }
+    
+    synchronized void unref(ClientMmapManager mmapManager) {
+      if (this.mmapManager != mmapManager) {
+        throw new IllegalArgumentException();
+      }
+      if (--refcnt == 0) {
+        IOUtils.cleanup(LOG, mmapManager);
+        mmapManager = null;
+      }
+    }
+  }
+
   /**
    * DFSClient configuration 
    */
@@ -534,6 +571,7 @@ public class DFSClient implements java.i
         new CachingStrategy(readDropBehind, readahead);
     this.defaultWriteCachingStrategy =
         new CachingStrategy(writeDropBehind, readahead);
+    this.mmapManager = MMAP_MANAGER_FACTORY.get(conf);
   }
   
   /**
@@ -738,9 +776,12 @@ public class DFSClient implements java.i
   
   /** Abort and release resources held.  Ignore all errors. */
   void abort() {
+    if (mmapManager != null) {
+      MMAP_MANAGER_FACTORY.unref(mmapManager);
+      mmapManager = null;
+    }
     clientRunning = false;
     closeAllFilesBeingWritten(true);
-
     try {
       // remove reference to this client and stop the renewer,
       // if there is no more clients under the renewer.
@@ -784,6 +825,10 @@ public class DFSClient implements java.i
    */
   @Override
   public synchronized void close() throws IOException {
+    if (mmapManager != null) {
+      MMAP_MANAGER_FACTORY.unref(mmapManager);
+      mmapManager = null;
+    }
     if(clientRunning) {
       closeAllFilesBeingWritten(false);
       clientRunning = false;
@@ -2496,4 +2541,9 @@ public class DFSClient implements java.i
   public CachingStrategy getDefaultWriteCachingStrategy() {
     return defaultWriteCachingStrategy;
   }
+
+  @VisibleForTesting
+  public ClientMmapManager getMmapManager() {
+    return mmapManager;
+  }
 }

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1527697&r1=1527696&r2=1527697&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Mon Sep 30 18:28:07 2013
@@ -376,6 +376,12 @@ public class DFSConfigKeys extends Commo
   public static final int DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_DEFAULT = 1024 * 1024;
   public static final String DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC = "dfs.client.domain.socket.data.traffic";
   public static final boolean DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT = false;
+  public static final String DFS_CLIENT_MMAP_CACHE_SIZE = "dfs.client.mmap.cache.size";
+  public static final int DFS_CLIENT_MMAP_CACHE_SIZE_DEFAULT = 1024;
+  public static final String DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS = "dfs.client.mmap.cache.timeout.ms";
+  public static final long DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS_DEFAULT  = 15 * 60 * 1000;
+  public static final String DFS_CLIENT_MMAP_CACHE_THREAD_RUNS_PER_TIMEOUT = "dfs.client.mmap.cache.timeout.ms";
+  public static final int DFS_CLIENT_MMAP_CACHE_THREAD_RUNS_PER_TIMEOUT_DEFAULT  = 4;
 
   // property for fsimage compression
   public static final String DFS_IMAGE_COMPRESS_KEY = "dfs.image.compress";

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java?rev=1527697&r1=1527696&r2=1527697&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java Mon Sep 30 18:28:07 2013
@@ -24,6 +24,7 @@ import java.net.Socket;
 import java.nio.ByteBuffer;
 import java.util.AbstractMap;
 import java.util.ArrayList;
+import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -36,11 +37,15 @@ import java.util.concurrent.ConcurrentHa
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.ByteBufferReadable;
+import org.apache.hadoop.fs.ByteBufferUtil;
 import org.apache.hadoop.fs.CanSetDropBehind;
 import org.apache.hadoop.fs.CanSetReadahead;
 import org.apache.hadoop.fs.ChecksumException;
 import org.apache.hadoop.fs.FSInputStream;
+import org.apache.hadoop.fs.HasEnhancedByteBufferAccess;
+import org.apache.hadoop.fs.ReadOption;
 import org.apache.hadoop.fs.UnresolvedLinkException;
+import org.apache.hadoop.hdfs.client.ClientMmap;
 import org.apache.hadoop.hdfs.net.DomainPeer;
 import org.apache.hadoop.hdfs.net.Peer;
 import org.apache.hadoop.hdfs.net.TcpPeerServer;
@@ -54,12 +59,14 @@ import org.apache.hadoop.hdfs.security.t
 import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
 import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
+import org.apache.hadoop.io.ByteBufferPool;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.unix.DomainSocket;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.IdentityHashStore;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -69,7 +76,8 @@ import com.google.common.annotations.Vis
  ****************************************************************/
 @InterfaceAudience.Private
 public class DFSInputStream extends FSInputStream
-implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead {
+implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
+    HasEnhancedByteBufferAccess {
   @VisibleForTesting
   static boolean tcpReadsDisabledForTesting = false;
   private final PeerCache peerCache;
@@ -87,17 +95,28 @@ implements ByteBufferReadable, CanSetDro
   private CachingStrategy cachingStrategy;
   private final ReadStatistics readStatistics = new ReadStatistics();
 
+  /**
+   * Track the ByteBuffers that we have handed out to readers.
+   * 
+   * The value type can be either ByteBufferPool or ClientMmap, depending on
+   * whether we this is a memory-mapped buffer or not.
+   */
+  private final IdentityHashStore<ByteBuffer, Object>
+      extendedReadBuffers = new IdentityHashStore<ByteBuffer, Object>(0);
+
   public static class ReadStatistics {
     public ReadStatistics() {
       this.totalBytesRead = 0;
       this.totalLocalBytesRead = 0;
       this.totalShortCircuitBytesRead = 0;
+      this.totalZeroCopyBytesRead = 0;
     }
 
     public ReadStatistics(ReadStatistics rhs) {
       this.totalBytesRead = rhs.getTotalBytesRead();
       this.totalLocalBytesRead = rhs.getTotalLocalBytesRead();
       this.totalShortCircuitBytesRead = rhs.getTotalShortCircuitBytesRead();
+      this.totalZeroCopyBytesRead = rhs.getTotalZeroCopyBytesRead();
     }
 
     /**
@@ -123,6 +142,13 @@ implements ByteBufferReadable, CanSetDro
     public long getTotalShortCircuitBytesRead() {
       return totalShortCircuitBytesRead;
     }
+    
+    /**
+     * @return The total number of zero-copy bytes read.
+     */
+    public long getTotalZeroCopyBytesRead() {
+      return totalZeroCopyBytesRead;
+    }
 
     /**
      * @return The total number of bytes read which were not local.
@@ -145,12 +171,21 @@ implements ByteBufferReadable, CanSetDro
       this.totalLocalBytesRead += amt;
       this.totalShortCircuitBytesRead += amt;
     }
+
+    void addZeroCopyBytes(long amt) {
+      this.totalBytesRead += amt;
+      this.totalLocalBytesRead += amt;
+      this.totalShortCircuitBytesRead += amt;
+      this.totalZeroCopyBytesRead += amt;
+    }
     
     private long totalBytesRead;
 
     private long totalLocalBytesRead;
 
     private long totalShortCircuitBytesRead;
+
+    private long totalZeroCopyBytesRead;
   }
   
   private final FileInputStreamCache fileInputStreamCache;
@@ -587,6 +622,20 @@ implements ByteBufferReadable, CanSetDro
     }
     dfsClient.checkOpen();
 
+    if (!extendedReadBuffers.isEmpty()) {
+      final StringBuilder builder = new StringBuilder();
+      extendedReadBuffers.visitAll(new IdentityHashStore.Visitor<ByteBuffer, Object>() {
+        private String prefix = "";
+        @Override
+        public void accept(ByteBuffer k, Object v) {
+          builder.append(prefix).append(k);
+          prefix = ", ";
+        }
+      });
+      DFSClient.LOG.warn("closing file " + src + ", but there are still " +
+          "unreleased ByteBuffers allocated by read().  " +
+          "Please release " + builder.toString() + ".");
+    }
     if (blockReader != null) {
       blockReader.close();
       blockReader = null;
@@ -1393,4 +1442,100 @@ implements ByteBufferReadable, CanSetDro
     this.cachingStrategy.setDropBehind(dropBehind);
     closeCurrentBlockReader();
   }
+
+  @Override
+  public synchronized ByteBuffer read(ByteBufferPool bufferPool,
+      int maxLength, EnumSet<ReadOption> opts) 
+          throws IOException, UnsupportedOperationException {
+    assert(maxLength > 0);
+    if (((blockReader == null) || (blockEnd == -1)) &&
+          (pos < getFileLength())) {
+      /*
+       * If we don't have a blockReader, or the one we have has no more bytes
+       * left to read, we call seekToBlockSource to get a new blockReader and
+       * recalculate blockEnd.  Note that we assume we're not at EOF here
+       * (we check this above).
+       */
+      if ((!seekToBlockSource(pos)) || (blockReader == null)) {
+        throw new IOException("failed to allocate new BlockReader " +
+            "at position " + pos);
+      }
+    }
+    boolean canSkipChecksums = opts.contains(ReadOption.SKIP_CHECKSUMS);
+    if (canSkipChecksums) {
+      ByteBuffer buffer = tryReadZeroCopy(maxLength);
+      if (buffer != null) {
+        return buffer;
+      }
+    }
+    ByteBuffer buffer = ByteBufferUtil.
+        fallbackRead(this, bufferPool, maxLength);
+    if (buffer != null) {
+      extendedReadBuffers.put(buffer, bufferPool);
+    }
+    return buffer;
+  }
+
+  private synchronized ByteBuffer tryReadZeroCopy(int maxLength)
+      throws IOException {
+    // Java ByteBuffers can't be longer than 2 GB, because they use
+    // 4-byte signed integers to represent capacity, etc.
+    // So we can't mmap the parts of the block higher than the 2 GB offset.
+    // FIXME: we could work around this with multiple memory maps.
+    // See HDFS-5101.
+    long blockEnd32 = Math.min(Integer.MAX_VALUE, blockEnd);
+    long curPos = pos;
+    long blockLeft = blockEnd32 - curPos + 1;
+    if (blockLeft <= 0) {
+      if (DFSClient.LOG.isDebugEnabled()) {
+        DFSClient.LOG.debug("unable to perform a zero-copy read from offset " +
+          curPos + " of " + src + "; blockLeft = " + blockLeft +
+          "; blockEnd32 = " + blockEnd32 + ", blockEnd = " + blockEnd +
+          "; maxLength = " + maxLength);
+      }
+      return null;
+    }
+    int length = Math.min((int)blockLeft, maxLength);
+    long blockStartInFile = currentLocatedBlock.getStartOffset();
+    long blockPos = curPos - blockStartInFile;
+    long limit = blockPos + length;
+    ClientMmap clientMmap =
+        blockReader.getClientMmap(currentLocatedBlock,
+            dfsClient.getMmapManager());
+    if (clientMmap == null) {
+      if (DFSClient.LOG.isDebugEnabled()) {
+        DFSClient.LOG.debug("unable to perform a zero-copy read from offset " +
+          curPos + " of " + src + "; BlockReader#getClientMmap returned " +
+          "null.");
+      }
+      return null;
+    }
+    seek(pos + length);
+    ByteBuffer buffer = clientMmap.getMappedByteBuffer().asReadOnlyBuffer();
+    buffer.position((int)blockPos);
+    buffer.limit((int)limit);
+    clientMmap.ref();
+    extendedReadBuffers.put(buffer, clientMmap);
+    readStatistics.addZeroCopyBytes(length);
+    if (DFSClient.LOG.isDebugEnabled()) {
+      DFSClient.LOG.debug("readZeroCopy read " + maxLength + " bytes from " +
+          "offset " + curPos + " via the zero-copy read path.  " +
+          "blockEnd = " + blockEnd);
+    }
+    return buffer;
+  }
+
+  @Override
+  public synchronized void releaseBuffer(ByteBuffer buffer) {
+    Object val = extendedReadBuffers.remove(buffer);
+    if (val == null) {
+      throw new IllegalArgumentException("tried to release a buffer " +
+          "that was not created by this stream, " + buffer);
+    }
+    if (val instanceof ClientMmap) {
+      ((ClientMmap)val).unref();
+    } else if (val instanceof ByteBufferPool) {
+      ((ByteBufferPool)val).putBuffer(buffer);
+    }
+  }
 }

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java?rev=1527697&r1=1527696&r2=1527697&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java Mon Sep 30 18:28:07 2013
@@ -27,9 +27,12 @@ import java.nio.ByteBuffer;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.FSInputChecker;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.client.ClientMmap;
+import org.apache.hadoop.hdfs.client.ClientMmapManager;
 import org.apache.hadoop.hdfs.net.Peer;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
 import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
 import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
@@ -485,4 +488,10 @@ public class RemoteBlockReader extends F
   public boolean isShortCircuit() {
     return false;
   }
+
+  @Override
+  public ClientMmap getClientMmap(LocatedBlock curBlock,
+      ClientMmapManager mmapManager) {
+    return null;
+  }
 }

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java?rev=1527697&r1=1527696&r2=1527697&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java Mon Sep 30 18:28:07 2013
@@ -29,9 +29,12 @@ import java.nio.channels.ReadableByteCha
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.client.ClientMmap;
+import org.apache.hadoop.hdfs.client.ClientMmapManager;
 import org.apache.hadoop.hdfs.net.Peer;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
 import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
 import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver;
@@ -40,7 +43,6 @@ import org.apache.hadoop.hdfs.protocol.p
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
-import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
 import org.apache.hadoop.hdfs.protocolPB.PBHelper;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
@@ -451,4 +453,10 @@ public class RemoteBlockReader2  impleme
   public boolean isShortCircuit() {
     return false;
   }
+
+  @Override
+  public ClientMmap getClientMmap(LocatedBlock curBlock,
+      ClientMmapManager manager) {
+    return null;
+  }
 }

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeHttpServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeHttpServer.java?rev=1527697&r1=1527696&r2=1527697&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeHttpServer.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeHttpServer.java Mon Sep 30 18:28:07 2013
@@ -100,7 +100,7 @@ public class NameNodeHttpServer {
     if (certSSL) {
       boolean needClientAuth = conf.getBoolean("dfs.https.need.client.auth", false);
       InetSocketAddress secInfoSocAddr = NetUtils.createSocketAddr(infoHost + ":" + conf.get(
-        DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_KEY, infoHost + ":" + 0));
+        DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_KEY, "0"));
       Configuration sslConf = new Configuration(false);
       if (certSSL) {
         sslConf.addResource(conf.get(DFSConfigKeys.DFS_SERVER_HTTPS_KEYSTORE_RESOURCE_KEY,

Propchange: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/native/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native:r1526971-1527683

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/expect.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/expect.h?rev=1527697&r1=1527696&r2=1527697&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/expect.h (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/expect.h Mon Sep 30 18:28:07 2013
@@ -19,16 +19,19 @@
 #ifndef LIBHDFS_NATIVE_TESTS_EXPECT_H
 #define LIBHDFS_NATIVE_TESTS_EXPECT_H
 
+#include <inttypes.h>
 #include <stdio.h>
 
+struct hdfsFile_internal;
+
 #define EXPECT_ZERO(x) \
     do { \
         int __my_ret__ = x; \
         if (__my_ret__) { \
             int __my_errno__ = errno; \
-            fprintf(stderr, "TEST_ERROR: failed on line %d with return " \
+            fprintf(stderr, "TEST_ERROR: failed on %s:%d with return " \
 		    "code %d (errno: %d): got nonzero from %s\n", \
-		    __LINE__, __my_ret__, __my_errno__, #x); \
+		    __FILE__, __LINE__, __my_ret__, __my_errno__, #x); \
             return __my_ret__; \
         } \
     } while (0);
@@ -38,9 +41,9 @@
         void* __my_ret__ = x; \
         int __my_errno__ = errno; \
         if (__my_ret__ != NULL) { \
-            fprintf(stderr, "TEST_ERROR: failed on line %d (errno: %d): " \
+            fprintf(stderr, "TEST_ERROR: failed on %s:%d (errno: %d): " \
 		    "got non-NULL value %p from %s\n", \
-		    __LINE__, __my_errno__, __my_ret__, #x); \
+		    __FILE__, __LINE__, __my_errno__, __my_ret__, #x); \
             return -1; \
         } \
     } while (0);
@@ -50,8 +53,8 @@
         void* __my_ret__ = x; \
         int __my_errno__ = errno; \
         if (__my_ret__ == NULL) { \
-            fprintf(stderr, "TEST_ERROR: failed on line %d (errno: %d): " \
-		    "got NULL from %s\n", __LINE__, __my_errno__, #x); \
+            fprintf(stderr, "TEST_ERROR: failed on %s:%d (errno: %d): " \
+		    "got NULL from %s\n", __FILE__, __LINE__, __my_errno__, #x); \
             return -1; \
         } \
     } while (0);
@@ -61,15 +64,16 @@
         int __my_ret__ = x; \
         int __my_errno__ = errno; \
         if (__my_ret__ != -1) { \
-            fprintf(stderr, "TEST_ERROR: failed on line %d with return " \
-                "code %d (errno: %d): expected -1 from %s\n", __LINE__, \
+            fprintf(stderr, "TEST_ERROR: failed on %s:%d with return " \
+                "code %d (errno: %d): expected -1 from %s\n", \
+                    __FILE__, __LINE__, \
                 __my_ret__, __my_errno__, #x); \
             return -1; \
         } \
         if (__my_errno__ != e) { \
-            fprintf(stderr, "TEST_ERROR: failed on line %d with return " \
+            fprintf(stderr, "TEST_ERROR: failed on %s:%d with return " \
                 "code %d (errno: %d): expected errno = %d from %s\n", \
-                __LINE__, __my_ret__, __my_errno__, e, #x); \
+                __FILE__, __LINE__, __my_ret__, __my_errno__, e, #x); \
             return -1; \
 	} \
     } while (0);
@@ -79,9 +83,9 @@
         int __my_ret__ = x; \
         int __my_errno__ = errno; \
         if (!__my_ret__) { \
-            fprintf(stderr, "TEST_ERROR: failed on line %d with return " \
-		    "code %d (errno: %d): got zero from %s\n", __LINE__, \
-                __my_ret__, __my_errno__, #x); \
+            fprintf(stderr, "TEST_ERROR: failed on %s:%d with return " \
+              "code %d (errno: %d): got zero from %s\n", __FILE__, __LINE__, \
+              __my_ret__, __my_errno__, #x); \
             return -1; \
         } \
     } while (0);
@@ -91,9 +95,9 @@
         int __my_ret__ = x; \
         int __my_errno__ = errno; \
         if (__my_ret__ < 0) { \
-            fprintf(stderr, "TEST_ERROR: failed on line %d with return " \
+            fprintf(stderr, "TEST_ERROR: failed on %s:%d with return " \
                 "code %d (errno: %d): got negative return from %s\n", \
-		    __LINE__, __my_ret__, __my_errno__, #x); \
+                __FILE__, __LINE__, __my_ret__, __my_errno__, #x); \
             return __my_ret__; \
         } \
     } while (0);
@@ -103,9 +107,21 @@
         int __my_ret__ = y; \
         int __my_errno__ = errno; \
         if (__my_ret__ != (x)) { \
-            fprintf(stderr, "TEST_ERROR: failed on line %d with return " \
+            fprintf(stderr, "TEST_ERROR: failed on %s:%d with return " \
               "code %d (errno: %d): expected %d\n", \
-               __LINE__, __my_ret__, __my_errno__, (x)); \
+               __FILE__, __LINE__, __my_ret__, __my_errno__, (x)); \
+            return -1; \
+        } \
+    } while (0);
+
+#define EXPECT_INT64_EQ(x, y) \
+    do { \
+        int64_t __my_ret__ = y; \
+        int __my_errno__ = errno; \
+        if (__my_ret__ != (x)) { \
+            fprintf(stderr, "TEST_ERROR: failed on %s:%d with return " \
+              "value %"PRId64" (errno: %d): expected %"PRId64"\n", \
+               __FILE__, __LINE__, __my_ret__, __my_errno__, (x)); \
             return -1; \
         } \
     } while (0);
@@ -117,4 +133,17 @@
     ret = -errno; \
     } while (ret == -EINTR);
 
+/**
+ * Test that an HDFS file has the given statistics.
+ *
+ * Any parameter can be set to UINT64_MAX to avoid checking it.
+ *
+ * @return 0 on success; error code otherwise
+ */
+int expectFileStats(struct hdfsFile_internal *file,
+      uint64_t expectedTotalBytesRead,
+      uint64_t expectedTotalLocalBytesRead,
+      uint64_t expectedTotalShortCircuitBytesRead,
+      uint64_t expectedTotalZeroCopyBytesRead);
+
 #endif

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.c?rev=1527697&r1=1527696&r2=1527697&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.c (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.c Mon Sep 30 18:28:07 2013
@@ -39,6 +39,7 @@
 #define JAVA_NET_ISA    "java/net/InetSocketAddress"
 #define JAVA_NET_URI    "java/net/URI"
 #define JAVA_STRING     "java/lang/String"
+#define READ_OPTION     "org/apache/hadoop/fs/ReadOption"
 
 #define JAVA_VOID       "V"
 
@@ -143,6 +144,15 @@ int hdfsFileGetReadStatistics(hdfsFile f
         goto done;
     }
     s->totalShortCircuitBytesRead = jVal.j;
+    jthr = invokeMethod(env, &jVal, INSTANCE, readStats,
+                  "org/apache/hadoop/hdfs/DFSInputStream$ReadStatistics",
+                  "getTotalZeroCopyBytesRead", "()J");
+    if (jthr) {
+        ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+            "hdfsFileGetReadStatistics: getTotalZeroCopyBytesRead failed");
+        goto done;
+    }
+    s->totalZeroCopyBytesRead = jVal.j;
     *stats = s;
     s = NULL;
     ret = 0;
@@ -183,6 +193,25 @@ void hdfsFileDisableDirectRead(hdfsFile 
     file->flags &= ~HDFS_FILE_SUPPORTS_DIRECT_READ;
 }
 
+int hdfsDisableDomainSocketSecurity(void)
+{
+    jthrowable jthr;
+    JNIEnv* env = getJNIEnv();
+    if (env == NULL) {
+      errno = EINTERNAL;
+      return -1;
+    }
+    jthr = invokeMethod(env, NULL, STATIC, NULL,
+            "org/apache/hadoop/net/unix/DomainSocket",
+            "disableBindPathValidation", "()V");
+    if (jthr) {
+        errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+            "DomainSocket#disableBindPathValidation");
+        return -1;
+    }
+    return 0;
+}
+
 /**
  * hdfsJniEnv: A wrapper struct to be used as 'value'
  * while saving thread -> JNIEnv* mappings
@@ -220,40 +249,6 @@ static jthrowable constructNewObjectOfPa
     return NULL;
 }
 
-/**
- * Set a configuration value.
- *
- * @param env               The JNI environment
- * @param jConfiguration    The configuration object to modify
- * @param key               The key to modify
- * @param value             The value to set the key to
- *
- * @return                  NULL on success; exception otherwise
- */
-static jthrowable hadoopConfSetStr(JNIEnv *env, jobject jConfiguration,
-        const char *key, const char *value)
-{
-    jthrowable jthr;
-    jstring jkey = NULL, jvalue = NULL;
-
-    jthr = newJavaStr(env, key, &jkey);
-    if (jthr)
-        goto done;
-    jthr = newJavaStr(env, value, &jvalue);
-    if (jthr)
-        goto done;
-    jthr = invokeMethod(env, NULL, INSTANCE, jConfiguration,
-            HADOOP_CONF, "set", JMETHOD2(JPARAM(JAVA_STRING),
-                                         JPARAM(JAVA_STRING), JAVA_VOID),
-            jkey, jvalue);
-    if (jthr)
-        goto done;
-done:
-    destroyLocalReference(env, jkey);
-    destroyLocalReference(env, jvalue);
-    return jthr;
-}
-
 static jthrowable hadoopConfGetStr(JNIEnv *env, jobject jConfiguration,
         const char *key, char **val)
 {
@@ -2108,6 +2103,395 @@ int hdfsUtime(hdfsFS fs, const char* pat
     return 0;
 }
 
+/**
+ * Zero-copy options.
+ *
+ * We cache the EnumSet of ReadOptions which has to be passed into every
+ * readZero call, to avoid reconstructing it each time.  This cache is cleared
+ * whenever an element changes.
+ */
+struct hadoopRzOptions
+{
+    JNIEnv *env;
+    int skipChecksums;
+    jobject byteBufferPool;
+    jobject cachedEnumSet;
+};
+
+struct hadoopRzOptions *hadoopRzOptionsAlloc(void)
+{
+    struct hadoopRzOptions *opts;
+    JNIEnv *env;
+
+    env = getJNIEnv();
+    if (!env) {
+        // Check to make sure the JNI environment is set up properly.
+        errno = EINTERNAL;
+        return NULL;
+    }
+    opts = calloc(1, sizeof(struct hadoopRzOptions));
+    if (!opts) {
+        errno = ENOMEM;
+        return NULL;
+    }
+    return opts;
+}
+
+static void hadoopRzOptionsClearCached(JNIEnv *env,
+        struct hadoopRzOptions *opts)
+{
+    if (!opts->cachedEnumSet) {
+        return;
+    }
+    (*env)->DeleteGlobalRef(env, opts->cachedEnumSet);
+    opts->cachedEnumSet = NULL;
+}
+
+int hadoopRzOptionsSetSkipChecksum(
+        struct hadoopRzOptions *opts, int skip)
+{
+    JNIEnv *env;
+    env = getJNIEnv();
+    if (!env) {
+        errno = EINTERNAL;
+        return -1;
+    }
+    hadoopRzOptionsClearCached(env, opts);
+    opts->skipChecksums = !!skip;
+    return 0;
+}
+
+int hadoopRzOptionsSetByteBufferPool(
+        struct hadoopRzOptions *opts, const char *className)
+{
+    JNIEnv *env;
+    jthrowable jthr;
+    jobject byteBufferPool = NULL;
+
+    env = getJNIEnv();
+    if (!env) {
+        errno = EINTERNAL;
+        return -1;
+    }
+
+    // Note: we don't have to call hadoopRzOptionsClearCached in this
+    // function, since the ByteBufferPool is passed separately from the
+    // EnumSet of ReadOptions.
+
+    jthr = constructNewObjectOfClass(env, &byteBufferPool, className, "()V");
+    if (jthr) {
+        printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+            "hadoopRzOptionsSetByteBufferPool(className=%s): ", className);
+        errno = EINVAL;
+        return -1;
+    }
+    if (opts->byteBufferPool) {
+        // Delete any previous ByteBufferPool we had.
+        (*env)->DeleteGlobalRef(env, opts->byteBufferPool);
+    }
+    opts->byteBufferPool = byteBufferPool;
+    return 0;
+}
+
+void hadoopRzOptionsFree(struct hadoopRzOptions *opts)
+{
+    JNIEnv *env;
+    env = getJNIEnv();
+    if (!env) {
+        return;
+    }
+    hadoopRzOptionsClearCached(env, opts);
+    if (opts->byteBufferPool) {
+        (*env)->DeleteGlobalRef(env, opts->byteBufferPool);
+        opts->byteBufferPool = NULL;
+    }
+    free(opts);
+}
+
+struct hadoopRzBuffer
+{
+    jobject byteBuffer;
+    uint8_t *ptr;
+    int32_t length;
+    int direct;
+};
+
+static jthrowable hadoopRzOptionsGetEnumSet(JNIEnv *env,
+        struct hadoopRzOptions *opts, jobject *enumSet)
+{
+    jthrowable jthr = NULL;
+    jobject enumInst = NULL, enumSetObj = NULL;
+    jvalue jVal;
+
+    if (opts->cachedEnumSet) {
+        // If we cached the value, return it now.
+        *enumSet = opts->cachedEnumSet;
+        goto done;
+    }
+    if (opts->skipChecksums) {
+        jthr = fetchEnumInstance(env, READ_OPTION,
+                  "SKIP_CHECKSUMS", &enumInst);
+        if (jthr) {
+            goto done;
+        }
+        jthr = invokeMethod(env, &jVal, STATIC, NULL,
+                "java/util/EnumSet", "of",
+                "(Ljava/lang/Enum;)Ljava/util/EnumSet;", enumInst);
+        if (jthr) {
+            goto done;
+        }
+        enumSetObj = jVal.l;
+    } else {
+        jclass clazz = (*env)->FindClass(env, READ_OPTION);
+        if (!clazz) {
+            jthr = newRuntimeError(env, "failed "
+                    "to find class for %s", READ_OPTION);
+            goto done;
+        }
+        jthr = invokeMethod(env, &jVal, STATIC, NULL,
+                "java/util/EnumSet", "noneOf",
+                "(Ljava/lang/Class;)Ljava/util/EnumSet;", clazz);
+        enumSetObj = jVal.l;
+    }
+    // create global ref
+    opts->cachedEnumSet = (*env)->NewGlobalRef(env, enumSetObj);
+    if (!opts->cachedEnumSet) {
+        jthr = getPendingExceptionAndClear(env);
+        goto done;
+    }
+    *enumSet = opts->cachedEnumSet;
+    jthr = NULL;
+done:
+    (*env)->DeleteLocalRef(env, enumInst);
+    (*env)->DeleteLocalRef(env, enumSetObj);
+    return jthr;
+}
+
+static int hadoopReadZeroExtractBuffer(JNIEnv *env,
+        const struct hadoopRzOptions *opts, struct hadoopRzBuffer *buffer)
+{
+    int ret;
+    jthrowable jthr;
+    jvalue jVal;
+    uint8_t *directStart;
+    void *mallocBuf = NULL;
+    jint position;
+    jarray array = NULL;
+
+    jthr = invokeMethod(env, &jVal, INSTANCE, buffer->byteBuffer,
+                     "java/nio/ByteBuffer", "remaining", "()I");
+    if (jthr) {
+        ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+                "hadoopReadZeroExtractBuffer: ByteBuffer#remaining failed: ");
+        goto done;
+    }
+    buffer->length = jVal.i;
+    jthr = invokeMethod(env, &jVal, INSTANCE, buffer->byteBuffer,
+                     "java/nio/ByteBuffer", "position", "()I");
+    if (jthr) {
+        ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+                "hadoopReadZeroExtractBuffer: ByteBuffer#position failed: ");
+        goto done;
+    }
+    position = jVal.i;
+    directStart = (*env)->GetDirectBufferAddress(env, buffer->byteBuffer);
+    if (directStart) {
+        // Handle direct buffers.
+        buffer->ptr = directStart + position;
+        buffer->direct = 1;
+        ret = 0;
+        goto done;
+    }
+    // Handle indirect buffers.
+    // The JNI docs don't say that GetDirectBufferAddress throws any exceptions
+    // when it fails.  However, they also don't clearly say that it doesn't.  It
+    // seems safest to clear any pending exceptions here, to prevent problems on
+    // various JVMs.
+    (*env)->ExceptionClear(env);
+    if (!opts->byteBufferPool) {
+        fputs("hadoopReadZeroExtractBuffer: we read through the "
+                "zero-copy path, but failed to get the address of the buffer via "
+                "GetDirectBufferAddress.  Please make sure your JVM supports "
+                "GetDirectBufferAddress.\n", stderr);
+        ret = ENOTSUP;
+        goto done;
+    }
+    // Get the backing array object of this buffer.
+    jthr = invokeMethod(env, &jVal, INSTANCE, buffer->byteBuffer,
+                     "java/nio/ByteBuffer", "array", "()[B");
+    if (jthr) {
+        ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+                "hadoopReadZeroExtractBuffer: ByteBuffer#array failed: ");
+        goto done;
+    }
+    array = jVal.l;
+    if (!array) {
+        fputs("hadoopReadZeroExtractBuffer: ByteBuffer#array returned NULL.",
+              stderr);
+        ret = EIO;
+        goto done;
+    }
+    mallocBuf = malloc(buffer->length);
+    if (!mallocBuf) {
+        fprintf(stderr, "hadoopReadZeroExtractBuffer: failed to allocate %d bytes of memory\n",
+                buffer->length);
+        ret = ENOMEM;
+        goto done;
+    }
+    (*env)->GetByteArrayRegion(env, array, position, buffer->length, mallocBuf);
+    jthr = (*env)->ExceptionOccurred(env);
+    if (jthr) {
+        ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+                "hadoopReadZeroExtractBuffer: GetByteArrayRegion failed: ");
+        goto done;
+    }
+    buffer->ptr = mallocBuf;
+    buffer->direct = 0;
+    ret = 0;
+
+done:
+    free(mallocBuf);
+    (*env)->DeleteLocalRef(env, array);
+    return ret;
+}
+
+static int translateZCRException(JNIEnv *env, jthrowable exc)
+{
+    int ret;
+    char *className = NULL;
+    jthrowable jthr = classNameOfObject(exc, env, &className);
+
+    if (jthr) {
+        fputs("hadoopReadZero: failed to get class name of "
+                "exception from read().\n", stderr);
+        destroyLocalReference(env, exc);
+        destroyLocalReference(env, jthr);
+        ret = EIO;
+        goto done;
+    }
+    if (!strcmp(className, "java.lang.UnsupportedOperationException")) {
+        ret = EPROTONOSUPPORT;
+        goto done;
+    }
+    ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+            "hadoopZeroCopyRead: ZeroCopyCursor#read failed");
+done:
+    free(className);
+    return ret;
+}
+
+struct hadoopRzBuffer* hadoopReadZero(hdfsFile file,
+            struct hadoopRzOptions *opts, int32_t maxLength)
+{
+    JNIEnv *env;
+    jthrowable jthr = NULL;
+    jvalue jVal;
+    jobject enumSet = NULL, byteBuffer = NULL;
+    struct hadoopRzBuffer* buffer = NULL;
+    int ret;
+
+    env = getJNIEnv();
+    if (!env) {
+        errno = EINTERNAL;
+        return NULL;
+    }
+    if (file->type != INPUT) {
+        fputs("Cannot read from a non-InputStream object!\n", stderr);
+        ret = EINVAL;
+        goto done;
+    }
+    buffer = calloc(1, sizeof(struct hadoopRzBuffer));
+    if (!buffer) {
+        ret = ENOMEM;
+        goto done;
+    }
+    jthr = hadoopRzOptionsGetEnumSet(env, opts, &enumSet);
+    if (jthr) {
+        ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+                "hadoopReadZero: hadoopRzOptionsGetEnumSet failed: ");
+        goto done;
+    }
+    jthr = invokeMethod(env, &jVal, INSTANCE, file->file, HADOOP_ISTRM, "read",
+        "(Lorg/apache/hadoop/io/ByteBufferPool;ILjava/util/EnumSet;)"
+        "Ljava/nio/ByteBuffer;", opts->byteBufferPool, maxLength, enumSet);
+    if (jthr) {
+        ret = translateZCRException(env, jthr);
+        goto done;
+    }
+    byteBuffer = jVal.l;
+    if (!byteBuffer) {
+        buffer->byteBuffer = NULL;
+        buffer->length = 0;
+        buffer->ptr = NULL;
+    } else {
+        buffer->byteBuffer = (*env)->NewGlobalRef(env, byteBuffer);
+        if (!buffer->byteBuffer) {
+            ret = printPendingExceptionAndFree(env, PRINT_EXC_ALL,
+                "hadoopReadZero: failed to create global ref to ByteBuffer");
+            goto done;
+        }
+        ret = hadoopReadZeroExtractBuffer(env, opts, buffer);
+        if (ret) {
+            goto done;
+        }
+    }
+    ret = 0;
+done:
+    (*env)->DeleteLocalRef(env, byteBuffer);
+    if (ret) {
+        if (buffer) {
+            if (buffer->byteBuffer) {
+                (*env)->DeleteGlobalRef(env, buffer->byteBuffer);
+            }
+            free(buffer);
+        }
+        errno = ret;
+        return NULL;
+    } else {
+        errno = 0;
+    }
+    return buffer;
+}
+
+int32_t hadoopRzBufferLength(const struct hadoopRzBuffer *buffer)
+{
+    return buffer->length;
+}
+
+const void *hadoopRzBufferGet(const struct hadoopRzBuffer *buffer)
+{
+    return buffer->ptr;
+}
+
+void hadoopRzBufferFree(hdfsFile file, struct hadoopRzBuffer *buffer)
+{
+    jvalue jVal;
+    jthrowable jthr;
+    JNIEnv* env;
+    
+    env = getJNIEnv();
+    if (env == NULL) {
+        errno = EINTERNAL;
+        return;
+    }
+    if (buffer->byteBuffer) {
+        jthr = invokeMethod(env, &jVal, INSTANCE, file->file,
+                    HADOOP_ISTRM, "releaseBuffer",
+                    "(Ljava/nio/ByteBuffer;)V", buffer->byteBuffer);
+        if (jthr) {
+            printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+                    "hadoopRzBufferFree: releaseBuffer failed: ");
+            // even on error, we have to delete the reference.
+        }
+        (*env)->DeleteGlobalRef(env, buffer->byteBuffer);
+    }
+    if (!buffer->direct) {
+        free(buffer->ptr);
+    }
+    memset(buffer, 0, sizeof(*buffer));
+    free(buffer);
+}
+
 char***
 hdfsGetHosts(hdfsFS fs, const char* path, tOffset start, tOffset length)
 {

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.h?rev=1527697&r1=1527696&r2=1527697&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.h (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.h Mon Sep 30 18:28:07 2013
@@ -36,6 +36,8 @@
 #define EINTERNAL 255 
 #endif
 
+#define ELASTIC_BYTE_BUFFER_POOL_CLASS \
+  "org/apache/hadoop/io/ElasticByteBufferPool"
 
 /** All APIs set errno to meaningful values */
 
@@ -65,6 +67,10 @@ extern  "C" {
     struct hdfsFile_internal;
     typedef struct hdfsFile_internal* hdfsFile;
 
+    struct hadoopRzOptions;
+
+    struct hadoopRzBuffer;
+
     /**
      * Determine if a file is open for read.
      *
@@ -85,6 +91,7 @@ extern  "C" {
       uint64_t totalBytesRead;
       uint64_t totalLocalBytesRead;
       uint64_t totalShortCircuitBytesRead;
+      uint64_t totalZeroCopyBytesRead;
     };
 
     /**
@@ -680,7 +687,107 @@ extern  "C" {
      * @return 0 on success else -1
      */
     int hdfsUtime(hdfsFS fs, const char* path, tTime mtime, tTime atime);
-    
+
+    /**
+     * Allocate a zero-copy options structure.
+     *
+     * You must free all options structures allocated with this function using
+     * hadoopRzOptionsFree.
+     *
+     * @return            A zero-copy options structure, or NULL if one could
+     *                    not be allocated.  If NULL is returned, errno will
+     *                    contain the error number.
+     */
+    struct hadoopRzOptions *hadoopRzOptionsAlloc(void);
+
+    /**
+     * Determine whether we should skip checksums in read0.
+     *
+     * @param opts        The options structure.
+     * @param skip        Nonzero to skip checksums sometimes; zero to always
+     *                    check them.
+     *
+     * @return            0 on success; -1 plus errno on failure.
+     */
+    int hadoopRzOptionsSetSkipChecksum(
+            struct hadoopRzOptions *opts, int skip);
+
+    /**
+     * Set the ByteBufferPool to use with read0.
+     *
+     * @param opts        The options structure.
+     * @param className   If this is NULL, we will not use any
+     *                    ByteBufferPool.  If this is non-NULL, it will be
+     *                    treated as the name of the pool class to use.
+     *                    For example, you can use
+     *                    ELASTIC_BYTE_BUFFER_POOL_CLASS.
+     *
+     * @return            0 if the ByteBufferPool class was found and
+     *                    instantiated;
+     *                    -1 plus errno otherwise.
+     */
+    int hadoopRzOptionsSetByteBufferPool(
+            struct hadoopRzOptions *opts, const char *className);
+
+    /**
+     * Free a hadoopRzOptionsFree structure.
+     *
+     * @param opts        The options structure to free.
+     *                    Any associated ByteBufferPool will also be freed.
+     */
+    void hadoopRzOptionsFree(struct hadoopRzOptions *opts);
+
+    /**
+     * Perform a byte buffer read.
+     * If possible, this will be a zero-copy (mmap) read.
+     *
+     * @param file       The file to read from.
+     * @param opts       An options structure created by hadoopRzOptionsAlloc.
+     * @param maxLength  The maximum length to read.  We may read fewer bytes
+     *                   than this length.
+     *
+     * @return           On success, returns a new hadoopRzBuffer.
+     *                   This buffer will continue to be valid and readable
+     *                   until it is released by readZeroBufferFree.  Failure to
+     *                   release a buffer will lead to a memory leak.
+     *
+     *                   NULL plus an errno code on an error.
+     *                   errno = EOPNOTSUPP indicates that we could not do a
+     *                   zero-copy read, and there was no ByteBufferPool
+     *                   supplied.
+     */
+    struct hadoopRzBuffer* hadoopReadZero(hdfsFile file,
+            struct hadoopRzOptions *opts, int32_t maxLength);
+
+    /**
+     * Determine the length of the buffer returned from readZero.
+     *
+     * @param buffer     a buffer returned from readZero.
+     * @return           the length of the buffer.
+     */
+    int32_t hadoopRzBufferLength(const struct hadoopRzBuffer *buffer);
+
+    /**
+     * Get a pointer to the raw buffer returned from readZero.
+     *
+     * To find out how many bytes this buffer contains, call
+     * hadoopRzBufferLength.
+     *
+     * @param buffer     a buffer returned from readZero.
+     * @return           a pointer to the start of the buffer.  This will be
+     *                   NULL when end-of-file has been reached.
+     */
+    const void *hadoopRzBufferGet(const struct hadoopRzBuffer *buffer);
+
+    /**
+     * Release a buffer obtained through readZero.
+     *
+     * @param file       The hdfs stream that created this buffer.  This must be
+     *                   the same stream you called hadoopReadZero on.
+     * @param buffer     The buffer to release.
+     */
+    void hadoopRzBufferFree(hdfsFile file, struct hadoopRzBuffer *buffer);
+
 #ifdef __cplusplus
 }
 #endif

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs_test.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs_test.h?rev=1527697&r1=1527696&r2=1527697&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs_test.h (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs_test.h Mon Sep 30 18:28:07 2013
@@ -48,6 +48,15 @@ extern  "C" {
      * @param file     The HDFS file
      */
     void hdfsFileDisableDirectRead(struct hdfsFile_internal *file);
+
+    /**
+     * Disable domain socket security checks.
+     *
+     * @param          0 if domain socket security was disabled;
+     *                 -1 if not.
+     */
+    int hdfsDisableDomainSocketSecurity(void); 
+
 #ifdef __cplusplus
 }
 #endif

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/jni_helper.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/jni_helper.c?rev=1527697&r1=1527696&r2=1527697&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/jni_helper.c (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/jni_helper.c Mon Sep 30 18:28:07 2013
@@ -608,3 +608,73 @@ JNIEnv* getJNIEnv(void)
     return env;
 }
 
+int javaObjectIsOfClass(JNIEnv *env, jobject obj, const char *name)
+{
+    jclass clazz;
+    int ret;
+
+    clazz = (*env)->FindClass(env, name);
+    if (!clazz) {
+        printPendingExceptionAndFree(env, PRINT_EXC_ALL,
+            "javaObjectIsOfClass(%s)", name);
+        return -1;
+    }
+    ret = (*env)->IsInstanceOf(env, obj, clazz);
+    (*env)->DeleteLocalRef(env, clazz);
+    return ret == JNI_TRUE ? 1 : 0;
+}
+
+jthrowable hadoopConfSetStr(JNIEnv *env, jobject jConfiguration,
+        const char *key, const char *value)
+{
+    jthrowable jthr;
+    jstring jkey = NULL, jvalue = NULL;
+
+    jthr = newJavaStr(env, key, &jkey);
+    if (jthr)
+        goto done;
+    jthr = newJavaStr(env, value, &jvalue);
+    if (jthr)
+        goto done;
+    jthr = invokeMethod(env, NULL, INSTANCE, jConfiguration,
+            "org/apache/hadoop/conf/Configuration", "set", 
+            "(Ljava/lang/String;Ljava/lang/String;)V",
+            jkey, jvalue);
+    if (jthr)
+        goto done;
+done:
+    (*env)->DeleteLocalRef(env, jkey);
+    (*env)->DeleteLocalRef(env, jvalue);
+    return jthr;
+}
+
+jthrowable fetchEnumInstance(JNIEnv *env, const char *className,
+                         const char *valueName, jobject *out)
+{
+    jclass clazz;
+    jfieldID fieldId;
+    jobject jEnum;
+    char prettyClass[256];
+
+    clazz = (*env)->FindClass(env, className);
+    if (!clazz) {
+        return newRuntimeError(env, "fetchEnum(%s, %s): failed to find class.",
+                className, valueName);
+    }
+    if (snprintf(prettyClass, sizeof(prettyClass), "L%s;", className)
+          >= sizeof(prettyClass)) {
+        return newRuntimeError(env, "fetchEnum(%s, %s): class name too long.",
+                className, valueName);
+    }
+    fieldId = (*env)->GetStaticFieldID(env, clazz, valueName, prettyClass);
+    if (!fieldId) {
+        return getPendingExceptionAndClear(env);
+    }
+    jEnum = (*env)->GetStaticObjectField(env, clazz, fieldId);
+    if (!jEnum) {
+        return getPendingExceptionAndClear(env);
+    }
+    *out = jEnum;
+    return NULL;
+}
+

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/jni_helper.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/jni_helper.h?rev=1527697&r1=1527696&r2=1527697&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/jni_helper.h (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/jni_helper.h Mon Sep 30 18:28:07 2013
@@ -114,6 +114,47 @@ jthrowable classNameOfObject(jobject job
  * */
 JNIEnv* getJNIEnv(void);
 
+/**
+ * Figure out if a Java object is an instance of a particular class.
+ *
+ * @param env  The Java environment.
+ * @param obj  The object to check.
+ * @param name The class name to check.
+ *
+ * @return     -1 if we failed to find the referenced class name.
+ *             0 if the object is not of the given class.
+ *             1 if the object is of the given class.
+ */
+int javaObjectIsOfClass(JNIEnv *env, jobject obj, const char *name);
+
+/**
+ * Set a value in a configuration object.
+ *
+ * @param env               The JNI environment
+ * @param jConfiguration    The configuration object to modify
+ * @param key               The key to modify
+ * @param value             The value to set the key to
+ *
+ * @return                  NULL on success; exception otherwise
+ */
+jthrowable hadoopConfSetStr(JNIEnv *env, jobject jConfiguration,
+        const char *key, const char *value);
+
+/**
+ * Fetch an instance of an Enum.
+ *
+ * @param env               The JNI environment.
+ * @param className         The enum class name.
+ * @param valueName         The name of the enum value
+ * @param out               (out param) on success, a local reference to an
+ *                          instance of the enum object.  (Since Java enums are
+ *                          singletones, this is also the only instance.)
+ *
+ * @return                  NULL on success; exception otherwise
+ */
+jthrowable fetchEnumInstance(JNIEnv *env, const char *className,
+                             const char *valueName, jobject *out);
+
 #endif /*LIBHDFS_JNI_HELPER_H*/
 
 /**

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/native_mini_dfs.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/native_mini_dfs.c?rev=1527697&r1=1527696&r2=1527697&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/native_mini_dfs.c (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/native_mini_dfs.c Mon Sep 30 18:28:07 2013
@@ -17,14 +17,19 @@
  */
 
 #include "exception.h"
+#include "hdfs.h"
+#include "hdfs_test.h"
 #include "jni_helper.h"
 #include "native_mini_dfs.h"
 
 #include <errno.h>
 #include <jni.h>
+#include <limits.h>
 #include <stdio.h>
 #include <stdlib.h>
 #include <string.h>
+#include <sys/types.h>
+#include <unistd.h>
 
 #define MINIDFS_CLUSTER_BUILDER "org/apache/hadoop/hdfs/MiniDFSCluster$Builder"
 #define MINIDFS_CLUSTER "org/apache/hadoop/hdfs/MiniDFSCluster"
@@ -39,8 +44,44 @@ struct NativeMiniDfsCluster {
      * The NativeMiniDfsCluster object
      */
     jobject obj;
+
+    /**
+     * Path to the domain socket, or the empty string if there is none.
+     */
+    char domainSocketPath[PATH_MAX];
 };
 
+static jthrowable nmdConfigureShortCircuit(JNIEnv *env,
+              struct NativeMiniDfsCluster *cl, jobject cobj)
+{
+    jthrowable jthr;
+    char *tmpDir;
+
+    int ret = hdfsDisableDomainSocketSecurity();
+    if (ret) {
+        return newRuntimeError(env, "failed to disable hdfs domain "
+                               "socket security: error %d", ret);
+    }
+    jthr = hadoopConfSetStr(env, cobj, "dfs.client.read.shortcircuit", "true");
+    if (jthr) {
+        return jthr;
+    }
+    tmpDir = getenv("TMPDIR");
+    if (!tmpDir) {
+        tmpDir = "/tmp";
+    }
+    snprintf(cl->domainSocketPath, PATH_MAX, "%s/native_mini_dfs.sock.%d.%d",
+             tmpDir, getpid(), rand());
+    snprintf(cl->domainSocketPath, PATH_MAX, "%s/native_mini_dfs.sock.%d.%d",
+             tmpDir, getpid(), rand());
+    jthr = hadoopConfSetStr(env, cobj, "dfs.domain.socket.path",
+                            cl->domainSocketPath);
+    if (jthr) {
+        return jthr;
+    }
+    return NULL;
+}
+
 struct NativeMiniDfsCluster* nmdCreate(struct NativeMiniDfsConf *conf)
 {
     struct NativeMiniDfsCluster* cl = NULL;
@@ -81,6 +122,28 @@ struct NativeMiniDfsCluster* nmdCreate(s
             goto error;
         }
     }
+    if (jthr) {
+        printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+                              "nmdCreate: Configuration::setBoolean");
+        goto error;
+    }
+    // Disable 'minimum block size' -- it's annoying in tests.
+    (*env)->DeleteLocalRef(env, jconfStr);
+    jconfStr = NULL;
+    jthr = newJavaStr(env, "dfs.namenode.fs-limits.min-block-size", &jconfStr);
+    if (jthr) {
+        printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+                              "nmdCreate: new String");
+        goto error;
+    }
+    jthr = invokeMethod(env, NULL, INSTANCE, cobj, HADOOP_CONF,
+                        "setLong", "(Ljava/lang/String;J)V", jconfStr, 0LL);
+    if (jthr) {
+        printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+                              "nmdCreate: Configuration::setLong");
+        goto error;
+    }
+    // Creae MiniDFSCluster object
     jthr = constructNewObjectOfClass(env, &bld, MINIDFS_CLUSTER_BUILDER,
                     "(L"HADOOP_CONF";)V", cobj);
     if (jthr) {
@@ -88,6 +151,14 @@ struct NativeMiniDfsCluster* nmdCreate(s
             "nmdCreate: NativeMiniDfsCluster#Builder#Builder");
         goto error;
     }
+    if (conf->configureShortCircuit) {
+        jthr = nmdConfigureShortCircuit(env, cl, cobj);
+        if (jthr) {
+            printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+                "nmdCreate: nmdConfigureShortCircuit error");
+            goto error;
+        }
+    }
     jthr = invokeMethod(env, &val, INSTANCE, bld, MINIDFS_CLUSTER_BUILDER,
             "format", "(Z)L" MINIDFS_CLUSTER_BUILDER ";", conf->doFormat);
     if (jthr) {
@@ -272,3 +343,29 @@ error_dlr_nn:
     
     return ret;
 }
+
+int nmdConfigureHdfsBuilder(struct NativeMiniDfsCluster *cl,
+                            struct hdfsBuilder *bld)
+{
+    int port, ret;
+
+    hdfsBuilderSetNameNode(bld, "localhost");
+    port = nmdGetNameNodePort(cl);
+    if (port < 0) {
+      fprintf(stderr, "nmdGetNameNodePort failed with error %d\n", -port);
+      return EIO;
+    }
+    hdfsBuilderSetNameNodePort(bld, port);
+    if (cl->domainSocketPath[0]) {
+      ret = hdfsBuilderConfSetStr(bld, "dfs.client.read.shortcircuit", "true");
+      if (ret) {
+          return ret;
+      }
+      ret = hdfsBuilderConfSetStr(bld, "dfs.domain.socket.path",
+                            cl->domainSocketPath);
+      if (ret) {
+          return ret;
+      }
+    }
+    return 0;
+}

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/native_mini_dfs.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/native_mini_dfs.h?rev=1527697&r1=1527696&r2=1527697&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/native_mini_dfs.h (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/native_mini_dfs.h Mon Sep 30 18:28:07 2013
@@ -21,6 +21,7 @@
 
 #include <jni.h> /* for jboolean */
 
+struct hdfsBuilder;
 struct NativeMiniDfsCluster; 
 
 /**
@@ -28,17 +29,24 @@ struct NativeMiniDfsCluster; 
  */
 struct NativeMiniDfsConf {
     /**
-     * Nonzero if the cluster should be formatted prior to startup
+     * Nonzero if the cluster should be formatted prior to startup.
      */
     jboolean doFormat;
+
     /**
      * Whether or not to enable webhdfs in MiniDfsCluster
      */
     jboolean webhdfsEnabled;
+
     /**
      * The http port of the namenode in MiniDfsCluster
      */
     jint namenodeHttpPort;
+
+    /**
+     * Nonzero if we should configure short circuit.
+     */
+    jboolean configureShortCircuit;
 };
 
 /**
@@ -84,7 +92,7 @@ void nmdFree(struct NativeMiniDfsCluster
  *
  * @return          the port, or a negative error code
  */
-int nmdGetNameNodePort(const struct NativeMiniDfsCluster *cl);
+int nmdGetNameNodePort(const struct NativeMiniDfsCluster *cl); 
 
 /**
  * Get the http address that's in use by the given (non-HA) nativeMiniDfs
@@ -101,4 +109,14 @@ int nmdGetNameNodePort(const struct Nati
 int nmdGetNameNodeHttpAddress(const struct NativeMiniDfsCluster *cl,
                                int *port, const char **hostName);
 
+/**
+ * Configure the HDFS builder appropriately to connect to this cluster.
+ *
+ * @param bld       The hdfs builder
+ *
+ * @return          the port, or a negative error code
+ */
+int nmdConfigureHdfsBuilder(struct NativeMiniDfsCluster *cl,
+                            struct hdfsBuilder *bld);
+
 #endif