You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by cm...@apache.org on 2013/09/24 23:40:54 UTC

svn commit: r1526020 [1/2] - in /hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/client/ src/main/native/libhdfs/ src/main/native/libhdfs/test/ src/test/jav...

Author: cmccabe
Date: Tue Sep 24 21:40:53 2013
New Revision: 1526020

URL: http://svn.apache.org/r1526020
Log:
HDFS-5191. Revisit zero-copy API in FSDataInputStream to make it more intuitive (Contributed by Colin Patrick McCabe)

Added:
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java
Removed:
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HdfsZeroCopyCursor.java
Modified:
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-4949.txt
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ClientMmapManager.java
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/expect.c
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.c
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.h
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/jni_helper.c
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/jni_helper.h
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/test/test_libhdfs_zerocopy.c
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-4949.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-4949.txt?rev=1526020&r1=1526019&r2=1526020&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-4949.txt (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-4949.txt Tue Sep 24 21:40:53 2013
@@ -45,6 +45,9 @@ HDFS-4949 (Unreleased)
     HDFS-5236. Change PathBasedCacheDirective APIs to be a single value
     rather than batch. (Contributed by Andrew Wang)
 
+    HDFS-5191. Revisit zero-copy API in FSDataInputStream to make it more
+    intuitive.  (Contributed by Colin Patrick McCabe)
+
   OPTIMIZATIONS
 
   BUG FIXES

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java?rev=1526020&r1=1526019&r2=1526020&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java Tue Sep 24 21:40:53 2013
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs;
 import java.io.IOException;
 
 import org.apache.hadoop.fs.ByteBufferReadable;
+import org.apache.hadoop.hdfs.client.ClientMmap;
 import org.apache.hadoop.hdfs.client.ClientMmapManager;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 
@@ -28,6 +29,7 @@ import org.apache.hadoop.hdfs.protocol.L
  * from a single datanode.
  */
 public interface BlockReader extends ByteBufferReadable {
+  
 
   /* same interface as inputStream java.io.InputStream#read()
    * used by DFSInputStream#read()
@@ -85,19 +87,12 @@ public interface BlockReader extends Byt
   boolean isShortCircuit();
 
   /**
-   * Do a zero-copy read with the current block reader.
-   *
-   * We assume that the calling code has done bounds checking, and won't ask 
-   * us for more bytes than are supposed to be visible (or are in the file).
+   * Get a ClientMmap object for this BlockReader.
    *
-   * @param buffers       The zero-copy buffers object.
    * @param curBlock      The current block.
-   * @param blockPos      Position in the current block to start reading at.
-   * @param toRead        The number of bytes to read from the block.
-   * 
-   * @return              true if the read was done, false otherwise.
+   * @return              The ClientMmap object, or null if mmap is not
+   *                      supported.
    */
-  boolean readZeroCopy(HdfsZeroCopyCursor buffers,
-        LocatedBlock curBlock, long blockPos, int toRead,
+  ClientMmap getClientMmap(LocatedBlock curBlock,
         ClientMmapManager mmapManager);
 }

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java?rev=1526020&r1=1526019&r2=1526020&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java Tue Sep 24 21:40:53 2013
@@ -548,46 +548,28 @@ class BlockReaderLocal implements BlockR
   }
 
   @Override
-  public boolean readZeroCopy(HdfsZeroCopyCursor cursor,
-        LocatedBlock curBlock, long blockPos, int toRead,
-        ClientMmapManager mmapManager) {
+  public ClientMmap getClientMmap(LocatedBlock curBlock,
+      ClientMmapManager mmapManager) {
     if (clientMmap == null) {
       if (mmapDisabled) {
-        return false;
+        return null;
       }
       try {
         clientMmap = mmapManager.fetch(datanodeID, block, dataIn);
         if (clientMmap == null) {
           mmapDisabled = true;
-          return false;
+          return null;
         }
       } catch (InterruptedException e) {
         LOG.error("Interrupted while setting up mmap for " + filename, e);
         Thread.currentThread().interrupt();
-        return false;
+        return null;
       } catch (IOException e) {
         LOG.error("unable to set up mmap for " + filename, e);
         mmapDisabled = true;
-        return false;
+        return null;
       }
     }
-    long limit = blockPos + toRead;
-    if (limit > Integer.MAX_VALUE) {
-      /*
-       * In Java, ByteBuffers use a 32-bit length, capacity, offset, etc.
-       * This limits our mmap'ed regions to 2 GB in length.
-       * TODO: we can implement zero-copy for larger blocks by doing multiple
-       * mmaps.
-       */
-      mmapDisabled = true;
-      clientMmap.unref();
-      clientMmap = null;
-      return false;
-    }
-    ByteBuffer mmapBuffer = clientMmap.getMappedByteBuffer().duplicate();
-    mmapBuffer.position((int)blockPos);
-    mmapBuffer.limit((int)limit);
-    cursor.setMmap(clientMmap, mmapBuffer);
-    return true;
+    return clientMmap;
   }
 }

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java?rev=1526020&r1=1526019&r2=1526020&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java Tue Sep 24 21:40:53 2013
@@ -28,6 +28,7 @@ import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.Map;
 
+import org.apache.hadoop.hdfs.client.ClientMmap;
 import org.apache.hadoop.hdfs.client.ClientMmapManager;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -705,9 +706,8 @@ class BlockReaderLocalLegacy implements 
   }
 
   @Override
-  public boolean readZeroCopy(HdfsZeroCopyCursor buffers,
-      LocatedBlock curBlock, long blockPos, int toRead,
+  public ClientMmap getClientMmap(LocatedBlock curBlock,
       ClientMmapManager mmapManager) {
-    return false;
+    return null;
   }
 }

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1526020&r1=1526019&r2=1526020&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java Tue Sep 24 21:40:53 2013
@@ -2610,7 +2610,8 @@ public class DFSClient implements java.i
     return defaultWriteCachingStrategy;
   }
 
-  ClientMmapManager getMmapManager() {
+  @VisibleForTesting
+  public ClientMmapManager getMmapManager() {
     return mmapManager;
   }
 }

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java?rev=1526020&r1=1526019&r2=1526020&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java Tue Sep 24 21:40:53 2013
@@ -24,6 +24,7 @@ import java.net.Socket;
 import java.nio.ByteBuffer;
 import java.util.AbstractMap;
 import java.util.ArrayList;
+import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -36,12 +37,15 @@ import java.util.concurrent.ConcurrentHa
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.ByteBufferReadable;
+import org.apache.hadoop.fs.ByteBufferUtil;
 import org.apache.hadoop.fs.CanSetDropBehind;
 import org.apache.hadoop.fs.CanSetReadahead;
 import org.apache.hadoop.fs.ChecksumException;
 import org.apache.hadoop.fs.FSInputStream;
+import org.apache.hadoop.fs.HasEnhancedByteBufferAccess;
+import org.apache.hadoop.fs.ReadOption;
 import org.apache.hadoop.fs.UnresolvedLinkException;
-import org.apache.hadoop.fs.ZeroCopyCursor;
+import org.apache.hadoop.hdfs.client.ClientMmap;
 import org.apache.hadoop.hdfs.net.DomainPeer;
 import org.apache.hadoop.hdfs.net.Peer;
 import org.apache.hadoop.hdfs.net.TcpPeerServer;
@@ -55,12 +59,14 @@ import org.apache.hadoop.hdfs.security.t
 import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
 import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
+import org.apache.hadoop.io.ByteBufferPool;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.unix.DomainSocket;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.IdentityHashStore;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -70,7 +76,8 @@ import com.google.common.annotations.Vis
  ****************************************************************/
 @InterfaceAudience.Private
 public class DFSInputStream extends FSInputStream
-implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead {
+implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
+    HasEnhancedByteBufferAccess {
   @VisibleForTesting
   static boolean tcpReadsDisabledForTesting = false;
   private final PeerCache peerCache;
@@ -88,6 +95,15 @@ implements ByteBufferReadable, CanSetDro
   private CachingStrategy cachingStrategy;
   private final ReadStatistics readStatistics = new ReadStatistics();
 
+  /**
+   * Track the ByteBuffers that we have handed out to readers.
+   * 
+   * The value type can be either ByteBufferPool or ClientMmap, depending on
+   * whether we this is a memory-mapped buffer or not.
+   */
+  private final IdentityHashStore<ByteBuffer, Object>
+      extendedReadBuffers = new IdentityHashStore<ByteBuffer, Object>(0);
+
   public static class ReadStatistics {
     public ReadStatistics() {
       this.totalBytesRead = 0;
@@ -606,6 +622,20 @@ implements ByteBufferReadable, CanSetDro
     }
     dfsClient.checkOpen();
 
+    if (!extendedReadBuffers.isEmpty()) {
+      final StringBuilder builder = new StringBuilder();
+      extendedReadBuffers.visitAll(new IdentityHashStore.Visitor<ByteBuffer, Object>() {
+        private String prefix = "";
+        @Override
+        public void accept(ByteBuffer k, Object v) {
+          builder.append(prefix).append(k);
+          prefix = ", ";
+        }
+      });
+      DFSClient.LOG.warn("closing file " + src + ", but there are still " +
+          "unreleased ByteBuffers allocated by read().  " +
+          "Please release " + builder.toString() + ".");
+    }
     if (blockReader != null) {
       blockReader.close();
       blockReader = null;
@@ -1413,9 +1443,11 @@ implements ByteBufferReadable, CanSetDro
     closeCurrentBlockReader();
   }
 
-  synchronized void readZeroCopy(HdfsZeroCopyCursor zcursor, int toRead)
-      throws IOException {
-    assert(toRead > 0);
+  @Override
+  public synchronized ByteBuffer read(ByteBufferPool bufferPool,
+      int maxLength, EnumSet<ReadOption> opts) 
+          throws IOException, UnsupportedOperationException {
+    assert(maxLength > 0);
     if (((blockReader == null) || (blockEnd == -1)) &&
           (pos < getFileLength())) {
       /*
@@ -1429,50 +1461,81 @@ implements ByteBufferReadable, CanSetDro
             "at position " + pos);
       }
     }
+    boolean canSkipChecksums = opts.contains(ReadOption.SKIP_CHECKSUMS);
+    if (canSkipChecksums) {
+      ByteBuffer buffer = tryReadZeroCopy(maxLength);
+      if (buffer != null) {
+        return buffer;
+      }
+    }
+    ByteBuffer buffer = ByteBufferUtil.
+        fallbackRead(this, bufferPool, maxLength);
+    if (buffer != null) {
+      extendedReadBuffers.put(buffer, bufferPool);
+    }
+    return buffer;
+  }
+
+  private synchronized ByteBuffer tryReadZeroCopy(int maxLength)
+      throws IOException {
+    // Java ByteBuffers can't be longer than 2 GB, because they use
+    // 4-byte signed integers to represent capacity, etc.
+    // So we can't mmap the parts of the block higher than the 2 GB offset.
+    // FIXME: we could work around this with multiple memory maps.
+    // See HDFS-5101.
+    long blockEnd32 = Math.min(Integer.MAX_VALUE, blockEnd);
     long curPos = pos;
-    boolean canSkipChecksums = zcursor.getSkipChecksums();
-    long blockLeft = blockEnd - curPos + 1;
-    if (zcursor.getAllowShortReads()) {
-      if (blockLeft < toRead) {
-        toRead = (int)blockLeft;
+    long blockLeft = blockEnd32 - curPos + 1;
+    if (blockLeft <= 0) {
+      if (DFSClient.LOG.isDebugEnabled()) {
+        DFSClient.LOG.debug("unable to perform a zero-copy read from offset " +
+          curPos + " of " + src + "; blockLeft = " + blockLeft +
+          "; blockEnd32 = " + blockEnd32 + ", blockEnd = " + blockEnd +
+          "; maxLength = " + maxLength);
       }
+      return null;
     }
-    if (canSkipChecksums && (toRead <= blockLeft)) {
-      long blockStartInFile = currentLocatedBlock.getStartOffset();
-      long blockPos = curPos - blockStartInFile;
-      if (blockReader.readZeroCopy(zcursor,
-            currentLocatedBlock, blockPos, toRead,
-            dfsClient.getMmapManager())) {
-        if (DFSClient.LOG.isDebugEnabled()) {
-          DFSClient.LOG.debug("readZeroCopy read " + toRead + " bytes from " +
-              "offset " + curPos + " via the zero-copy read path.  " +
-              "blockEnd = " + blockEnd);
-        }
-        readStatistics.addZeroCopyBytes(toRead);
-        seek(pos + toRead);
-        return;
+    int length = Math.min((int)blockLeft, maxLength);
+    long blockStartInFile = currentLocatedBlock.getStartOffset();
+    long blockPos = curPos - blockStartInFile;
+    long limit = blockPos + length;
+    ClientMmap clientMmap =
+        blockReader.getClientMmap(currentLocatedBlock,
+            dfsClient.getMmapManager());
+    if (clientMmap == null) {
+      if (DFSClient.LOG.isDebugEnabled()) {
+        DFSClient.LOG.debug("unable to perform a zero-copy read from offset " +
+          curPos + " of " + src + "; BlockReader#getClientMmap returned " +
+          "null.");
       }
+      return null;
     }
-    /*
-     * Slow path reads.
-     *
-     * readStatistics will be updated when we call back into this
-     * stream's read methods.
-     */
-    long prevBlockEnd = blockEnd;
-    int slowReadAmount = zcursor.readViaSlowPath(toRead);
+    seek(pos + length);
+    ByteBuffer buffer = clientMmap.getMappedByteBuffer().asReadOnlyBuffer();
+    buffer.position((int)blockPos);
+    buffer.limit((int)limit);
+    clientMmap.ref();
+    extendedReadBuffers.put(buffer, clientMmap);
+    readStatistics.addZeroCopyBytes(length);
     if (DFSClient.LOG.isDebugEnabled()) {
-      DFSClient.LOG.debug("readZeroCopy read " + slowReadAmount + " bytes " +
-          "from offset " + curPos + " via the fallback read path.  " +
-          "prevBlockEnd = " + prevBlockEnd + ", blockEnd = " + blockEnd +
-          ", canSkipChecksums = " + canSkipChecksums);
+      DFSClient.LOG.debug("readZeroCopy read " + maxLength + " bytes from " +
+          "offset " + curPos + " via the zero-copy read path.  " +
+          "blockEnd = " + blockEnd);
     }
+    return buffer;
   }
 
   @Override
-  public ZeroCopyCursor createZeroCopyCursor() 
-      throws IOException, UnsupportedOperationException {
-    return new HdfsZeroCopyCursor(this,
-        dfsClient.getConf().skipShortCircuitChecksums);
+  public synchronized void releaseBuffer(ByteBuffer buffer) {
+    Object val = extendedReadBuffers.remove(buffer);
+    if (val == null) {
+      throw new IllegalArgumentException("tried to release a buffer " +
+          "that was not created by this stream, " + buffer);
+    }
+    if (val instanceof ClientMmap) {
+      ((ClientMmap)val).unref();
+    } else if (val instanceof ByteBufferPool) {
+      ((ByteBufferPool)val).putBuffer(buffer);
+    }
   }
 }

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java?rev=1526020&r1=1526019&r2=1526020&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java Tue Sep 24 21:40:53 2013
@@ -27,6 +27,7 @@ import java.nio.ByteBuffer;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.FSInputChecker;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.client.ClientMmap;
 import org.apache.hadoop.hdfs.client.ClientMmapManager;
 import org.apache.hadoop.hdfs.net.Peer;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
@@ -489,9 +490,8 @@ public class RemoteBlockReader extends F
   }
 
   @Override
-  public boolean readZeroCopy(HdfsZeroCopyCursor buffers,
-      LocatedBlock curBlock, long blockPos, int toRead,
+  public ClientMmap getClientMmap(LocatedBlock curBlock,
       ClientMmapManager mmapManager) {
-    return false;
+    return null;
   }
 }

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java?rev=1526020&r1=1526019&r2=1526020&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java Tue Sep 24 21:40:53 2013
@@ -29,6 +29,7 @@ import java.nio.channels.ReadableByteCha
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.client.ClientMmap;
 import org.apache.hadoop.hdfs.client.ClientMmapManager;
 import org.apache.hadoop.hdfs.net.Peer;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
@@ -454,9 +455,8 @@ public class RemoteBlockReader2  impleme
   }
 
   @Override
-  public boolean readZeroCopy(HdfsZeroCopyCursor buffers,
-      LocatedBlock curBlock, long blockPos, int toRead,
+  public ClientMmap getClientMmap(LocatedBlock curBlock,
       ClientMmapManager manager) {
-    return false;
+    return null;
   }
 }

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ClientMmapManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ClientMmapManager.java?rev=1526020&r1=1526019&r2=1526020&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ClientMmapManager.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ClientMmapManager.java Tue Sep 24 21:40:53 2013
@@ -361,6 +361,10 @@ public class ClientMmapManager implement
       }
       waitable.provide(mmap);
     }
+    if (LOG.isDebugEnabled()) {
+      LOG.info("created a new ClientMmap for block " + key.block +
+          " on datanode " + key.datanode);
+    }
     return mmap;
   }
 
@@ -403,8 +407,10 @@ public class ClientMmapManager implement
     finally {
       lock.unlock();
     }
-    LOG.debug("reusing existing mmap with datanodeID=" + datanodeID +
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("reusing existing mmap with datanodeID=" + datanodeID +
               ", " + "block=" + block);
+    }
     return mmap;
   }
 

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/expect.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/expect.c?rev=1526020&r1=1526019&r2=1526020&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/expect.c (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/expect.c Tue Sep 24 21:40:53 2013
@@ -32,6 +32,22 @@ int expectFileStats(hdfsFile file,
 {
     struct hdfsReadStatistics *stats = NULL;
     EXPECT_ZERO(hdfsFileGetReadStatistics(file, &stats));
+    fprintf(stderr, "expectFileStats(expectedTotalBytesRead=%"PRId64", "
+            "expectedTotalLocalBytesRead=%"PRId64", "
+            "expectedTotalShortCircuitBytesRead=%"PRId64", "
+            "expectedTotalZeroCopyBytesRead=%"PRId64", "
+            "totalBytesRead=%"PRId64", "
+            "totalLocalBytesRead=%"PRId64", "
+            "totalShortCircuitBytesRead=%"PRId64", "
+            "totalZeroCopyBytesRead=%"PRId64")\n",
+            expectedTotalBytesRead,
+            expectedTotalLocalBytesRead,
+            expectedTotalShortCircuitBytesRead,
+            expectedTotalZeroCopyBytesRead,
+            stats->totalBytesRead,
+            stats->totalLocalBytesRead,
+            stats->totalShortCircuitBytesRead,
+            stats->totalZeroCopyBytesRead);
     if (expectedTotalBytesRead != UINT64_MAX) {
         EXPECT_INT64_EQ(expectedTotalBytesRead, stats->totalBytesRead);
     }

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.c?rev=1526020&r1=1526019&r2=1526020&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.c (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.c Tue Sep 24 21:40:53 2013
@@ -39,7 +39,7 @@
 #define JAVA_NET_ISA    "java/net/InetSocketAddress"
 #define JAVA_NET_URI    "java/net/URI"
 #define JAVA_STRING     "java/lang/String"
-#define HADOOP_ZERO_COPY_CURSOR "org/apache/hadoop/fs/ZeroCopyCursor"
+#define READ_OPTION     "org/apache/hadoop/fs/ReadOption"
 
 #define JAVA_VOID       "V"
 
@@ -2103,151 +2103,258 @@ int hdfsUtime(hdfsFS fs, const char* pat
     return 0;
 }
 
-struct hadoopZeroCopyCursor* hadoopZeroCopyCursorAlloc(hdfsFile file)
+/**
+ * Zero-copy options.
+ *
+ * We cache the EnumSet of ReadOptions which has to be passed into every
+ * readZero call, to avoid reconstructing it each time.  This cache is cleared
+ * whenever an element changes.
+ */
+struct hadoopRzOptions
 {
-    int ret;
-    jobject zcursor = NULL;
-    jvalue jVal;
-    jthrowable jthr;
-    JNIEnv* env;
+    JNIEnv *env;
+    int skipChecksums;
+    jobject byteBufferPool;
+    jobject cachedEnumSet;
+};
+
+struct hadoopRzOptions *hadoopRzOptionsAlloc(void)
+{
+    struct hadoopRzOptions *opts;
+    JNIEnv *env;
 
     env = getJNIEnv();
-    if (env == NULL) {
+    if (!env) {
+        // Check to make sure the JNI environment is set up properly.
         errno = EINTERNAL;
         return NULL;
     }
-    if (file->type != INPUT) {
-        ret = EINVAL;
-        goto done;
-    }
-    jthr = invokeMethod(env, &jVal, INSTANCE, (jobject)file->file, HADOOP_ISTRM,
-                     "createZeroCopyCursor", "()L"HADOOP_ZERO_COPY_CURSOR";");
-    if (jthr) {
-        ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
-            "hadoopZeroCopyCursorAlloc: createZeroCopyCursor");
-        goto done;
-    }
-    zcursor = (*env)->NewGlobalRef(env, jVal.l);
-    if (!zcursor) {
-        ret = printPendingExceptionAndFree(env, PRINT_EXC_ALL,
-            "hadoopZeroCopyCursorAlloc: NewGlobalRef"); 
-    }
-    ret = 0;
-done:
-    if (ret) {
-        errno = ret;
+    opts = calloc(1, sizeof(struct hadoopRzOptions));
+    if (!opts) {
+        errno = ENOMEM;
+        return NULL;
     }
-    return (struct hadoopZeroCopyCursor*)zcursor;
+    return opts;
 }
 
-int hadoopZeroCopyCursorSetFallbackBuffer(struct hadoopZeroCopyCursor* zcursor,
-                                          void *cbuf, uint32_t size)
+static void hadoopRzOptionsClearCached(JNIEnv *env,
+        struct hadoopRzOptions *opts)
 {
-    int ret;
-    jobject buffer = NULL;
-    jthrowable jthr;
-    JNIEnv* env;
+    if (!opts->cachedEnumSet) {
+        return;
+    }
+    (*env)->DeleteGlobalRef(env, opts->cachedEnumSet);
+    opts->cachedEnumSet = NULL;
+}
 
+int hadoopRzOptionsSetSkipChecksum(
+        struct hadoopRzOptions *opts, int skip)
+{
+    JNIEnv *env;
     env = getJNIEnv();
-    if (env == NULL) {
+    if (!env) {
         errno = EINTERNAL;
         return -1;
     }
-    buffer = (*env)->NewDirectByteBuffer(env, cbuf, size);
-    if (!buffer) {
-        ret = printPendingExceptionAndFree(env, PRINT_EXC_ALL,
-            "hadoopZeroCopyCursorSetFallbackBuffer: NewDirectByteBuffer("
-            "size=%"PRId32"):", size);
-        goto done;
-    }
-    jthr = invokeMethod(env, NULL, INSTANCE, (jobject)zcursor, 
-                    HADOOP_ZERO_COPY_CURSOR, "setFallbackBuffer",
-                    "(Ljava/nio/ByteBuffer;)V", buffer);
-    if (jthr) {
-        ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
-                      "hadoopZeroCopyCursorSetFallbackBuffer: "
-                      "FileSystem#setFallbackBuffer");
-        goto done;
-    }
-    ret = 0;
-done:
-    if (ret) {
-        (*env)->DeleteLocalRef(env, buffer);
-        errno = ret;
-        return -1;
-    }
+    hadoopRzOptionsClearCached(env, opts);
+    opts->skipChecksums = !!skip;
     return 0;
 }
 
-int hadoopZeroCopyCursorSetSkipChecksums(struct hadoopZeroCopyCursor* zcursor,
-                                         int skipChecksums)
+int hadoopRzOptionsSetByteBufferPool(
+        struct hadoopRzOptions *opts, const char *className)
 {
-    JNIEnv* env;
+    JNIEnv *env;
     jthrowable jthr;
-    jboolean shouldSkipChecksums = skipChecksums ? JNI_TRUE : JNI_FALSE; 
+    jobject byteBufferPool = NULL;
 
     env = getJNIEnv();
-    if (env == NULL) {
+    if (!env) {
         errno = EINTERNAL;
         return -1;
     }
-    jthr = invokeMethod(env, NULL, INSTANCE, (jobject)zcursor,
-        HADOOP_ZERO_COPY_CURSOR, "setSkipChecksums", "(Z)V",
-        shouldSkipChecksums);
+
+    // Note: we don't have to call hadoopRzOptionsClearCached in this
+    // function, since the ByteBufferPool is passed separately from the
+    // EnumSet of ReadOptions.
+
+    jthr = constructNewObjectOfClass(env, &byteBufferPool, className, "()V");
     if (jthr) {
-        errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
-            "hadoopZeroCopyCursorSetSkipChecksums(): setSkipChecksums failed");
+        printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+            "hadoopRzOptionsSetByteBufferPool(className=%s): ", className);
+        errno = EINVAL;
         return -1;
     }
+    if (opts->byteBufferPool) {
+        // Delete any previous ByteBufferPool we had.
+        (*env)->DeleteGlobalRef(env, opts->byteBufferPool);
+    }
+    opts->byteBufferPool = byteBufferPool;
     return 0;
 }
 
-int hadoopZeroCopyCursorSetAllowShortReads(
-            struct hadoopZeroCopyCursor* zcursor, int allowShort)
+void hadoopRzOptionsFree(struct hadoopRzOptions *opts)
 {
-    JNIEnv* env;
-    jthrowable jthr;
-    jboolean shouldAllowShort = allowShort ? JNI_TRUE : JNI_FALSE;
-
+    JNIEnv *env;
     env = getJNIEnv();
-    if (env == NULL) {
-        errno = EINTERNAL;
-        return -1;
+    if (!env) {
+        return;
     }
-    jthr = invokeMethod(env, NULL, INSTANCE, (jobject)zcursor,
-        HADOOP_ZERO_COPY_CURSOR, "setAllowShortReads", "(Z)V",
-        shouldAllowShort);
-    if (jthr) {
-        errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
-            "hadoopZeroCopyCursorSetAllowShortReads(): setAllowShortReads "
-            "failed");
-        return -1;
+    hadoopRzOptionsClearCached(env, opts);
+    if (opts->byteBufferPool) {
+        (*env)->DeleteGlobalRef(env, opts->byteBufferPool);
+        opts->byteBufferPool = NULL;
     }
-    return 0;
+    free(opts);
 }
 
-void hadoopZeroCopyCursorFree(struct hadoopZeroCopyCursor *zcursor)
+struct hadoopRzBuffer
 {
-    JNIEnv* env;
+    jobject byteBuffer;
+    uint8_t *ptr;
+    int32_t length;
+    int direct;
+};
+
+static jthrowable hadoopRzOptionsGetEnumSet(JNIEnv *env,
+        struct hadoopRzOptions *opts, jobject *enumSet)
+{
+    jthrowable jthr = NULL;
+    jobject enumInst = NULL, enumSetObj = NULL;
+    jvalue jVal;
+
+    if (opts->cachedEnumSet) {
+        // If we cached the value, return it now.
+        *enumSet = opts->cachedEnumSet;
+        goto done;
+    }
+    if (opts->skipChecksums) {
+        jthr = fetchEnumInstance(env, READ_OPTION,
+                  "SKIP_CHECKSUMS", &enumInst);
+        if (jthr) {
+            goto done;
+        }
+        jthr = invokeMethod(env, &jVal, STATIC, NULL,
+                "java/util/EnumSet", "of",
+                "(Ljava/lang/Enum;)Ljava/util/EnumSet;", enumInst);
+        if (jthr) {
+            goto done;
+        }
+        enumSetObj = jVal.l;
+    } else {
+        jclass clazz = (*env)->FindClass(env, READ_OPTION);
+        if (!clazz) {
+            jthr = newRuntimeError(env, "failed "
+                    "to find class for %s", READ_OPTION);
+            goto done;
+        }
+        jthr = invokeMethod(env, &jVal, STATIC, NULL,
+                "java/util/EnumSet", "noneOf",
+                "(Ljava/lang/Class;)Ljava/util/EnumSet;", clazz);
+        enumSetObj = jVal.l;
+    }
+    // create global ref
+    opts->cachedEnumSet = (*env)->NewGlobalRef(env, enumSetObj);
+    if (!opts->cachedEnumSet) {
+        jthr = getPendingExceptionAndClear(env);
+        goto done;
+    }
+    *enumSet = opts->cachedEnumSet;
+    jthr = NULL;
+done:
+    (*env)->DeleteLocalRef(env, enumInst);
+    (*env)->DeleteLocalRef(env, enumSetObj);
+    return jthr;
+}
+
+static int hadoopReadZeroExtractBuffer(JNIEnv *env,
+        const struct hadoopRzOptions *opts, struct hadoopRzBuffer *buffer)
+{
+    int ret;
     jthrowable jthr;
+    jvalue jVal;
+    uint8_t *directStart;
+    void *mallocBuf = NULL;
+    jint position;
+    jarray array = NULL;
 
-    env = getJNIEnv();
-    if (env == NULL) {
-        return;
+    jthr = invokeMethod(env, &jVal, INSTANCE, buffer->byteBuffer,
+                     "java/nio/ByteBuffer", "remaining", "()I");
+    if (jthr) {
+        ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+                "hadoopReadZeroExtractBuffer: ByteBuffer#remaining failed: ");
+        goto done;
     }
-    jthr = invokeMethod(env, NULL, INSTANCE, (jobject)zcursor,
-                     HADOOP_ZERO_COPY_CURSOR, "close", "()V");
+    buffer->length = jVal.i;
+    jthr = invokeMethod(env, &jVal, INSTANCE, buffer->byteBuffer,
+                     "java/nio/ByteBuffer", "position", "()I");
     if (jthr) {
-        printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
-                "hadoopZeroCopyCursorFree(): close failed");
+        ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+                "hadoopReadZeroExtractBuffer: ByteBuffer#position failed: ");
+        goto done;
+    }
+    position = jVal.i;
+    directStart = (*env)->GetDirectBufferAddress(env, buffer->byteBuffer);
+    if (directStart) {
+        // Handle direct buffers.
+        buffer->ptr = directStart + position;
+        buffer->direct = 1;
+        ret = 0;
+        goto done;
+    }
+    // Handle indirect buffers.
+    // The JNI docs don't say that GetDirectBufferAddress throws any exceptions
+    // when it fails.  However, they also don't clearly say that it doesn't.  It
+    // seems safest to clear any pending exceptions here, to prevent problems on
+    // various JVMs.
+    (*env)->ExceptionClear(env);
+    if (!opts->byteBufferPool) {
+        fputs("hadoopReadZeroExtractBuffer: we read through the "
+                "zero-copy path, but failed to get the address of the buffer via "
+                "GetDirectBufferAddress.  Please make sure your JVM supports "
+                "GetDirectBufferAddress.\n", stderr);
+        ret = ENOTSUP;
+        goto done;
+    }
+    // Get the backing array object of this buffer.
+    jthr = invokeMethod(env, &jVal, INSTANCE, buffer->byteBuffer,
+                     "java/nio/ByteBuffer", "array", "()[B");
+    if (jthr) {
+        ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+                "hadoopReadZeroExtractBuffer: ByteBuffer#array failed: ");
+        goto done;
+    }
+    array = jVal.l;
+    if (!array) {
+        fputs("hadoopReadZeroExtractBuffer: ByteBuffer#array returned NULL.",
+              stderr);
+        ret = EIO;
+        goto done;
+    }
+    mallocBuf = malloc(buffer->length);
+    if (!mallocBuf) {
+        fprintf(stderr, "hadoopReadZeroExtractBuffer: failed to allocate %d bytes of memory\n",
+                buffer->length);
+        ret = ENOMEM;
+        goto done;
+    }
+    (*env)->GetByteArrayRegion(env, array, position, buffer->length, mallocBuf);
+    jthr = (*env)->ExceptionOccurred(env);
+    if (jthr) {
+        ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+                "hadoopReadZeroExtractBuffer: GetByteArrayRegion failed: ");
+        goto done;
     }
-    (*env)->DeleteGlobalRef(env, (jobject)zcursor);
+    buffer->ptr = mallocBuf;
+    buffer->direct = 0;
+    ret = 0;
+
+done:
+    free(mallocBuf);
+    (*env)->DeleteLocalRef(env, array);
+    return ret;
 }
 
-/**
- *  Translate an exception from ZeroCopyCursor#read, translate it into a return
- *  code.
- */
 static int translateZCRException(JNIEnv *env, jthrowable exc)
 {
     int ret;
@@ -2255,16 +2362,12 @@ static int translateZCRException(JNIEnv 
     jthrowable jthr = classNameOfObject(exc, env, &className);
 
     if (jthr) {
-        fprintf(stderr, "hadoopZeroCopyRead: unknown "
-                "exception from read().\n");
-        destroyLocalReference(env, jthr);
+        fputs("hadoopReadZero: failed to get class name of "
+                "exception from read().\n", stderr);
+        destroyLocalReference(env, exc);
         destroyLocalReference(env, jthr);
         ret = EIO;
         goto done;
-    } 
-    if (!strcmp(className, "java.io.EOFException")) {
-        ret = 0; // EOF
-        goto done;
     }
     if (!strcmp(className, "java.lang.UnsupportedOperationException")) {
         ret = EPROTONOSUPPORT;
@@ -2277,72 +2380,116 @@ done:
     return ret;
 }
 
-int32_t hadoopZeroCopyRead(struct hadoopZeroCopyCursor *zcursor,
-                           int32_t toRead, const void **data)
+struct hadoopRzBuffer* hadoopReadZero(hdfsFile file,
+            struct hadoopRzOptions *opts, int32_t maxLength)
 {
-    int32_t ret, nRead = -1;
-    JNIEnv* env;
-    jthrowable jthr;
-    jobject byteBuffer = NULL;
-    uint8_t *addr;
-    jint position;
+    JNIEnv *env;
+    jthrowable jthr = NULL;
     jvalue jVal;
-    
+    jobject enumSet = NULL, byteBuffer = NULL;
+    struct hadoopRzBuffer* buffer = NULL;
+    int ret;
+
     env = getJNIEnv();
-    if (env == NULL) {
+    if (!env) {
         errno = EINTERNAL;
-        return -1;
-    }
-    jthr = invokeMethod(env, NULL, INSTANCE, (jobject)zcursor,
-                     HADOOP_ZERO_COPY_CURSOR, "read", "(I)V", toRead);
-    if (jthr) {
-        ret = translateZCRException(env, jthr);
-        goto done;
+        return NULL;
     }
-    jthr = invokeMethod(env, &jVal, INSTANCE, (jobject)zcursor,
-                     HADOOP_ZERO_COPY_CURSOR, "getData",
-                     "()Ljava/nio/ByteBuffer;");
-    if (jthr) {
-        ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
-                "hadoopZeroCopyRead(toRead=%"PRId32"): getData failed",
-                toRead);
+    if (file->type != INPUT) {
+        fputs("Cannot read from a non-InputStream object!\n", stderr);
+        ret = EINVAL;
         goto done;
     }
-    byteBuffer = jVal.l;
-    addr = (*env)->GetDirectBufferAddress(env, byteBuffer);
-    if (!addr) {
-        fprintf(stderr, "hadoopZeroCopyRead(toRead=%"PRId32"): "
-                    "failed to get direct buffer address.\n", toRead);
-        ret = EIO;
+    buffer = calloc(1, sizeof(struct hadoopRzBuffer));
+    if (!buffer) {
+        ret = ENOMEM;
         goto done;
     }
-    jthr = invokeMethod(env, &jVal, INSTANCE, byteBuffer,
-                     "java/nio/ByteBuffer", "position", "()I");
+    jthr = hadoopRzOptionsGetEnumSet(env, opts, &enumSet);
     if (jthr) {
         ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
-                "hadoopZeroCopyRead(toRead=%"PRId32"): ByteBuffer#position "
-                "failed", toRead);
+                "hadoopReadZero: hadoopRzOptionsGetEnumSet failed: ");
         goto done;
     }
-    position = jVal.i;
-    jthr = invokeMethod(env, &jVal, INSTANCE, byteBuffer,
-                     "java/nio/ByteBuffer", "remaining", "()I");
+    jthr = invokeMethod(env, &jVal, INSTANCE, file->file, HADOOP_ISTRM, "read",
+        "(Lorg/apache/hadoop/io/ByteBufferPool;ILjava/util/EnumSet;)"
+        "Ljava/nio/ByteBuffer;", opts->byteBufferPool, maxLength, enumSet);
     if (jthr) {
-        ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
-                "hadoopZeroCopyRead(toRead=%"PRId32"): ByteBuffer#remaining "
-                "failed", toRead);
+        ret = translateZCRException(env, jthr);
         goto done;
     }
+    byteBuffer = jVal.l;
+    if (!byteBuffer) {
+        buffer->byteBuffer = NULL;
+        buffer->length = 0;
+        buffer->ptr = NULL;
+    } else {
+        buffer->byteBuffer = (*env)->NewGlobalRef(env, byteBuffer);
+        if (!buffer->byteBuffer) {
+            ret = printPendingExceptionAndFree(env, PRINT_EXC_ALL,
+                "hadoopReadZero: failed to create global ref to ByteBuffer");
+            goto done;
+        }
+        ret = hadoopReadZeroExtractBuffer(env, opts, buffer);
+        if (ret) {
+            goto done;
+        }
+    }
     ret = 0;
-    nRead = jVal.i;
-    *data = addr + position;
 done:
     (*env)->DeleteLocalRef(env, byteBuffer);
-    if (nRead == -1) {
+    if (ret) {
+        if (buffer) {
+            if (buffer->byteBuffer) {
+                (*env)->DeleteGlobalRef(env, buffer->byteBuffer);
+            }
+            free(buffer);
+        }
         errno = ret;
-        return -1;
+        return NULL;
+    } else {
+        errno = 0;
+    }
+    return buffer;
+}
+
+int32_t hadoopRzBufferLength(const struct hadoopRzBuffer *buffer)
+{
+    return buffer->length;
+}
+
+const void *hadoopRzBufferGet(const struct hadoopRzBuffer *buffer)
+{
+    return buffer->ptr;
+}
+
+void hadoopRzBufferFree(hdfsFile file, struct hadoopRzBuffer *buffer)
+{
+    jvalue jVal;
+    jthrowable jthr;
+    JNIEnv* env;
+    
+    env = getJNIEnv();
+    if (env == NULL) {
+        errno = EINTERNAL;
+        return;
+    }
+    if (buffer->byteBuffer) {
+        jthr = invokeMethod(env, &jVal, INSTANCE, file->file,
+                    HADOOP_ISTRM, "releaseBuffer",
+                    "(Ljava/nio/ByteBuffer;)V", buffer->byteBuffer);
+        if (jthr) {
+            printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+                    "hadoopRzBufferFree: releaseBuffer failed: ");
+            // even on error, we have to delete the reference.
+        }
+        (*env)->DeleteGlobalRef(env, buffer->byteBuffer);
+    }
+    if (!buffer->direct) {
+        free(buffer->ptr);
     }
-    return nRead;
+    memset(buffer, 0, sizeof(*buffer));
+    free(buffer);
 }
 
 char***

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.h?rev=1526020&r1=1526019&r2=1526020&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.h (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.h Tue Sep 24 21:40:53 2013
@@ -36,6 +36,8 @@
 #define EINTERNAL 255 
 #endif
 
+#define ELASTIC_BYTE_BUFFER_POOL_CLASS \
+  "org/apache/hadoop/io/ElasticByteBufferPool"
 
 /** All APIs set errno to meaningful values */
 
@@ -65,6 +67,10 @@ extern  "C" {
     struct hdfsFile_internal;
     typedef struct hdfsFile_internal* hdfsFile;
 
+    struct hadoopRzOptions;
+
+    struct hadoopRzBuffer;
+
     /**
      * Determine if a file is open for read.
      *
@@ -683,86 +689,104 @@ extern  "C" {
     int hdfsUtime(hdfsFS fs, const char* path, tTime mtime, tTime atime);
 
     /**
-     * Create a zero-copy cursor object.
+     * Allocate a zero-copy options structure.
      *
-     * @param file        The file to use for zero-copy reads.
+     * You must free all options structures allocated with this function using
+     * hadoopRzOptionsFree.
      *
-     * @return            The zero-copy cursor, or NULL + errno on failure.
+     * @return            A zero-copy options structure, or NULL if one could
+     *                    not be allocated.  If NULL is returned, errno will
+     *                    contain the error number.
      */
-    struct hadoopZeroCopyCursor* hadoopZeroCopyCursorAlloc(hdfsFile file);
+    struct hadoopRzOptions *hadoopRzOptionsAlloc(void);
 
     /**
-     * Set the fallback buffer which will be used by the zero copy object.
+     * Determine whether we should skip checksums in read0.
      *
-     * You are responsible for ensuring that this buffer stays valid until you
-     * either set a different buffer by calling this function again, or free the
-     * zero-copy cursor.
+     * @param opts        The options structure.
+     * @param skip        Nonzero to skip checksums sometimes; zero to always
+     *                    check them.
      *
-     * @param zcursor     The zero-copy cursor.
-     * @param cbuf        The buffer to use.
-     * @param size        Size of the buffer.
-     *
-     * @return            0 on success.  -1 on error.  Errno will be set on
-     *                    error. 
+     * @return            0 on success; -1 plus errno on failure.
      */
-    int hadoopZeroCopyCursorSetFallbackBuffer(
-              struct hadoopZeroCopyCursor* zcursor, void *cbuf, uint32_t size);
+    int hadoopRzOptionsSetSkipChecksum(
+            struct hadoopRzOptions *opts, int skip);
 
     /**
-     * Set whether our cursor should skip checksums or not.
+     * Set the ByteBufferPool to use with read0.
      *
-     * @param zcursor        The cursor
-     * @param skipChecksums  Nonzero to skip checksums.
+     * @param opts        The options structure.
+     * @param className   If this is NULL, we will not use any
+     *                    ByteBufferPool.  If this is non-NULL, it will be
+     *                    treated as the name of the pool class to use.
+     *                    For example, you can use
+     *                    ELASTIC_BYTE_BUFFER_POOL_CLASS.
      *
-     * @return               -1 on error, 0 otherwise.
+     * @return            0 if the ByteBufferPool class was found and
+     *                    instantiated;
+     *                    -1 plus errno otherwise.
      */
-    int hadoopZeroCopyCursorSetSkipChecksums(
-            struct hadoopZeroCopyCursor* zcursor, int skipChecksums);
+    int hadoopRzOptionsSetByteBufferPool(
+            struct hadoopRzOptions *opts, const char *className);
 
     /**
-     * Set whether our cursor should allow short reads to occur.
-     * Short reads will always occur if there is not enough data to read
-     * (i.e., at EOF), but normally we don't return them when reading other
-     * parts of the file.
-     *
-     * @param zcursor        The cursor
-     * @param skipChecksums  Nonzero to skip checksums.
+     * Free a hadoopRzOptionsFree structure.
      *
-     * @return               -1 on error, 0 otherwise.
+     * @param opts        The options structure to free.
+     *                    Any associated ByteBufferPool will also be freed.
      */
-    int hadoopZeroCopyCursorSetAllowShortReads(
-                struct hadoopZeroCopyCursor* zcursor, int allowShort);
+    void hadoopRzOptionsFree(struct hadoopRzOptions *opts);
 
     /**
-     * Free zero-copy cursor.
+     * Perform a byte buffer read.
+     * If possible, this will be a zero-copy (mmap) read.
      *
-     * This will dispose of the cursor allocated by hadoopZeroCopyCursorAlloc, as
-     * well as any memory map that we have created.  You must be done with the
-     * data returned from hadoopZeroCopyRead before calling this.
+     * @param file       The file to read from.
+     * @param opts       An options structure created by hadoopRzOptionsAlloc.
+     * @param maxLength  The maximum length to read.  We may read fewer bytes
+     *                   than this length.
+     *
+     * @return           On success, returns a new hadoopRzBuffer.
+     *                   This buffer will continue to be valid and readable
+     *                   until it is released by readZeroBufferFree.  Failure to
+     *                   release a buffer will lead to a memory leak.
+     *
+     *                   NULL plus an errno code on an error.
+     *                   errno = EOPNOTSUPP indicates that we could not do a
+     *                   zero-copy read, and there was no ByteBufferPool
+     *                   supplied.
+     */
+    struct hadoopRzBuffer* hadoopReadZero(hdfsFile file,
+            struct hadoopRzOptions *opts, int32_t maxLength);
+
+    /**
+     * Determine the length of the buffer returned from readZero.
      *
-     * @param zcursor     The zero-copy cursor.
+     * @param buffer     a buffer returned from readZero.
+     * @return           the length of the buffer.
      */
-    void hadoopZeroCopyCursorFree(struct hadoopZeroCopyCursor *zcursor);
+    int32_t hadoopRzBufferLength(const struct hadoopRzBuffer *buffer);
 
-    /*
-     * Perform a zero-copy read.
+    /**
+     * Get a pointer to the raw buffer returned from readZero.
+     *
+     * To find out how many bytes this buffer contains, call
+     * hadoopRzBufferLength.
      *
-     * @param zcursor     The zero-copy cursor object.
-     * @param toRead      The maximum amount to read.
-     * @param data        (out param) on succesful return, a pointer to the
-     *                    data.  This pointer will remain valid until the next
-     *                    call to hadoopZeroCopyRead, or until
-     *                    hadoopZeroCopyCursorFree is called on zcursor.
+     * @param buffer     a buffer returned from readZero.
+     * @return           a pointer to the start of the buffer.  This will be
+     *                   NULL when end-of-file has been reached.
+     */
+    const void *hadoopRzBufferGet(const struct hadoopRzBuffer *buffer);
+
+    /**
+     * Release a buffer obtained through readZero.
      *
-     * @return            -2 if zero-copy is unavailable, and 
-     *                    -1 if there was an error.  errno will be the error.
-     *                    0 if we hit end-of-file without reading anything.
-     *                    The positive number of bytes read otherwise.  Short
-     *                        reads will happen only if EOF is reached.
-     *                    The amount read otherwise.
+     * @param file       The hdfs stream that created this buffer.  This must be
+     *                   the same stream you called hadoopReadZero on.
+     * @param buffer     The buffer to release.
      */
-    int32_t hadoopZeroCopyRead(struct hadoopZeroCopyCursor *zcursor,
-                             int32_t toRead, const void **data);
+    void hadoopRzBufferFree(hdfsFile file, struct hadoopRzBuffer *buffer);
 
 #ifdef __cplusplus
 }

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/jni_helper.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/jni_helper.c?rev=1526020&r1=1526019&r2=1526020&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/jni_helper.c (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/jni_helper.c Tue Sep 24 21:40:53 2013
@@ -647,3 +647,34 @@ done:
     (*env)->DeleteLocalRef(env, jvalue);
     return jthr;
 }
+
+jthrowable fetchEnumInstance(JNIEnv *env, const char *className,
+                         const char *valueName, jobject *out)
+{
+    jclass clazz;
+    jfieldID fieldId;
+    jobject jEnum;
+    char prettyClass[256];
+
+    clazz = (*env)->FindClass(env, className);
+    if (!clazz) {
+        return newRuntimeError(env, "fetchEnum(%s, %s): failed to find class.",
+                className, valueName);
+    }
+    if (snprintf(prettyClass, sizeof(prettyClass), "L%s;", className)
+          >= sizeof(prettyClass)) {
+        return newRuntimeError(env, "fetchEnum(%s, %s): class name too long.",
+                className, valueName);
+    }
+    fieldId = (*env)->GetStaticFieldID(env, clazz, valueName, prettyClass);
+    if (!fieldId) {
+        return getPendingExceptionAndClear(env);
+    }
+    jEnum = (*env)->GetStaticObjectField(env, clazz, fieldId);
+    if (!jEnum) {
+        return getPendingExceptionAndClear(env);
+    }
+    *out = jEnum;
+    return NULL;
+}
+

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/jni_helper.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/jni_helper.h?rev=1526020&r1=1526019&r2=1526020&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/jni_helper.h (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/jni_helper.h Tue Sep 24 21:40:53 2013
@@ -140,6 +140,21 @@ int javaObjectIsOfClass(JNIEnv *env, job
 jthrowable hadoopConfSetStr(JNIEnv *env, jobject jConfiguration,
         const char *key, const char *value);
 
+/**
+ * Fetch an instance of an Enum.
+ *
+ * @param env               The JNI environment.
+ * @param className         The enum class name.
+ * @param valueName         The name of the enum value
+ * @param out               (out param) on success, a local reference to an
+ *                          instance of the enum object.  (Since Java enums are
+ *                          singletones, this is also the only instance.)
+ *
+ * @return                  NULL on success; exception otherwise
+ */
+jthrowable fetchEnumInstance(JNIEnv *env, const char *className,
+                             const char *valueName, jobject *out);
+
 #endif /*LIBHDFS_JNI_HELPER_H*/
 
 /**

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/test/test_libhdfs_zerocopy.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/test/test_libhdfs_zerocopy.c?rev=1526020&r1=1526019&r2=1526020&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/test/test_libhdfs_zerocopy.c (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/test/test_libhdfs_zerocopy.c Tue Sep 24 21:40:53 2013
@@ -81,40 +81,49 @@ static void printBuf(const uint8_t *buf,
 static int doTestZeroCopyReads(hdfsFS fs, const char *fileName)
 {
     hdfsFile file = NULL;
-    struct hadoopZeroCopyCursor *zcursor = NULL;
-    uint8_t *backingBuffer = NULL, *block;
-    const void *zcPtr;
+    struct hadoopRzOptions *opts = NULL;
+    struct hadoopRzBuffer *buffer = NULL;
+    uint8_t *block;
 
     file = hdfsOpenFile(fs, fileName, O_RDONLY, 0, 0, 0);
     EXPECT_NONNULL(file);
-    zcursor = hadoopZeroCopyCursorAlloc(file);
-    EXPECT_NONNULL(zcursor);
+    opts = hadoopRzOptionsAlloc();
+    EXPECT_NONNULL(opts);
+    EXPECT_ZERO(hadoopRzOptionsSetSkipChecksum(opts, 1));
     /* haven't read anything yet */
     EXPECT_ZERO(expectFileStats(file, 0LL, 0LL, 0LL, 0LL));
     block = getZeroCopyBlockData(0);
     EXPECT_NONNULL(block);
     /* first read is half of a block. */
+    buffer = hadoopReadZero(file, opts, TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2);
+    EXPECT_NONNULL(buffer);
     EXPECT_INT_EQ(TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2,
-          hadoopZeroCopyRead(zcursor,
-                  TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2, &zcPtr));
-    EXPECT_ZERO(memcmp(zcPtr, block, TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2));
+          hadoopRzBufferLength(buffer));
+    EXPECT_ZERO(memcmp(hadoopRzBufferGet(buffer), block,
+          TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2));
+    hadoopRzBufferFree(file, buffer);
     /* read the next half of the block */
+    buffer = hadoopReadZero(file, opts, TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2);
+    EXPECT_NONNULL(buffer);
     EXPECT_INT_EQ(TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2,
-          hadoopZeroCopyRead(zcursor,
-                  TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2, &zcPtr));
-    EXPECT_ZERO(memcmp(zcPtr, block + (TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2),
-                       TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2));
+          hadoopRzBufferLength(buffer));
+    EXPECT_ZERO(memcmp(hadoopRzBufferGet(buffer),
+          block + (TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2),
+          TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2));
+    hadoopRzBufferFree(file, buffer);
     free(block);
     EXPECT_ZERO(expectFileStats(file, TEST_ZEROCOPY_FULL_BLOCK_SIZE, 
               TEST_ZEROCOPY_FULL_BLOCK_SIZE,
               TEST_ZEROCOPY_FULL_BLOCK_SIZE,
               TEST_ZEROCOPY_FULL_BLOCK_SIZE));
     /* Now let's read just a few bytes. */
-    EXPECT_INT_EQ(SMALL_READ_LEN,
-                  hadoopZeroCopyRead(zcursor, SMALL_READ_LEN, &zcPtr));
+    buffer = hadoopReadZero(file, opts, SMALL_READ_LEN);
+    EXPECT_NONNULL(buffer);
+    EXPECT_INT_EQ(SMALL_READ_LEN, hadoopRzBufferLength(buffer));
     block = getZeroCopyBlockData(1);
     EXPECT_NONNULL(block);
-    EXPECT_ZERO(memcmp(block, zcPtr, SMALL_READ_LEN));
+    EXPECT_ZERO(memcmp(block, hadoopRzBufferGet(buffer), SMALL_READ_LEN));
+    hadoopRzBufferFree(file, buffer);
     EXPECT_INT_EQ(TEST_ZEROCOPY_FULL_BLOCK_SIZE + SMALL_READ_LEN,
                   hdfsTell(fs, file));
     EXPECT_ZERO(expectFileStats(file,
@@ -123,37 +132,36 @@ static int doTestZeroCopyReads(hdfsFS fs
           TEST_ZEROCOPY_FULL_BLOCK_SIZE + SMALL_READ_LEN,
           TEST_ZEROCOPY_FULL_BLOCK_SIZE + SMALL_READ_LEN));
 
-    /* Try to read a full block's worth of data.  This will cross the block
-     * boundary, which means we have to fall back to non-zero-copy reads.
-     * However, because we don't have a backing buffer, the fallback will fail
-     * with EPROTONOSUPPORT. */
-    EXPECT_INT_EQ(-1, 
-          hadoopZeroCopyRead(zcursor, TEST_ZEROCOPY_FULL_BLOCK_SIZE, &zcPtr));
+    /* Clear 'skip checksums' and test that we can't do zero-copy reads any
+     * more.  Since there is no ByteBufferPool set, we should fail with
+     * EPROTONOSUPPORT.
+     */
+    EXPECT_ZERO(hadoopRzOptionsSetSkipChecksum(opts, 0));
+    EXPECT_NULL(hadoopReadZero(file, opts, TEST_ZEROCOPY_FULL_BLOCK_SIZE));
     EXPECT_INT_EQ(EPROTONOSUPPORT, errno);
 
-    /* Now set a backing buffer and try again.  It should succeed this time. */
-    backingBuffer = malloc(ZC_BUF_LEN);
-    EXPECT_NONNULL(backingBuffer);
-    EXPECT_ZERO(hadoopZeroCopyCursorSetFallbackBuffer(zcursor,
-                                          backingBuffer, ZC_BUF_LEN));
-    EXPECT_INT_EQ(TEST_ZEROCOPY_FULL_BLOCK_SIZE, 
-          hadoopZeroCopyRead(zcursor, TEST_ZEROCOPY_FULL_BLOCK_SIZE, &zcPtr));
+    /* Now set a ByteBufferPool and try again.  It should succeed this time. */
+    EXPECT_ZERO(hadoopRzOptionsSetByteBufferPool(opts,
+          ELASTIC_BYTE_BUFFER_POOL_CLASS));
+    buffer = hadoopReadZero(file, opts, TEST_ZEROCOPY_FULL_BLOCK_SIZE);
+    EXPECT_NONNULL(buffer);
+    EXPECT_INT_EQ(TEST_ZEROCOPY_FULL_BLOCK_SIZE, hadoopRzBufferLength(buffer));
     EXPECT_ZERO(expectFileStats(file,
           (2 * TEST_ZEROCOPY_FULL_BLOCK_SIZE) + SMALL_READ_LEN,
           (2 * TEST_ZEROCOPY_FULL_BLOCK_SIZE) + SMALL_READ_LEN,
           (2 * TEST_ZEROCOPY_FULL_BLOCK_SIZE) + SMALL_READ_LEN,
           TEST_ZEROCOPY_FULL_BLOCK_SIZE + SMALL_READ_LEN));
-    EXPECT_ZERO(memcmp(block + SMALL_READ_LEN, zcPtr,
+    EXPECT_ZERO(memcmp(block + SMALL_READ_LEN, hadoopRzBufferGet(buffer),
         TEST_ZEROCOPY_FULL_BLOCK_SIZE - SMALL_READ_LEN));
     free(block);
     block = getZeroCopyBlockData(2);
     EXPECT_NONNULL(block);
-    EXPECT_ZERO(memcmp(block, zcPtr +
+    EXPECT_ZERO(memcmp(block, hadoopRzBufferGet(buffer) +
         (TEST_ZEROCOPY_FULL_BLOCK_SIZE - SMALL_READ_LEN), SMALL_READ_LEN));
+    hadoopRzBufferFree(file, buffer);
     free(block);
-    hadoopZeroCopyCursorFree(zcursor);
+    hadoopRzOptionsFree(opts);
     EXPECT_ZERO(hdfsCloseFile(fs, file));
-    free(backingBuffer);
     return 0;
 }
 

Added: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java?rev=1526020&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java (added)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java Tue Sep 24 21:40:53 2013
@@ -0,0 +1,530 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.concurrent.TimeoutException;
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.Random;
+
+import org.apache.commons.lang.SystemUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.client.ClientMmap;
+import org.apache.hadoop.hdfs.client.ClientMmapManager;
+import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.io.ByteBufferPool;
+import org.apache.hadoop.io.ElasticByteBufferPool;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.nativeio.NativeIO;
+import org.apache.hadoop.net.unix.DomainSocket;
+import org.apache.hadoop.net.unix.TemporarySocketDirectory;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+
+/**
+ * This class tests if EnhancedByteBufferAccess works correctly.
+ */
+public class TestEnhancedByteBufferAccess {
+  private static final Log LOG =
+      LogFactory.getLog(TestEnhancedByteBufferAccess.class.getName());
+
+  static TemporarySocketDirectory sockDir;
+
+  @BeforeClass
+  public static void init() {
+    sockDir = new TemporarySocketDirectory();
+    DomainSocket.disableBindPathValidation();
+  }
+
+  private static byte[] byteBufferToArray(ByteBuffer buf) {
+    byte resultArray[] = new byte[buf.remaining()];
+    buf.get(resultArray);
+    buf.flip();
+    return resultArray;
+  }
+  
+  public static HdfsConfiguration initZeroCopyTest() {
+    Assume.assumeTrue(NativeIO.isAvailable());
+    Assume.assumeTrue(SystemUtils.IS_OS_UNIX);
+    HdfsConfiguration conf = new HdfsConfiguration();
+    conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 4096);
+    conf.setInt(DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_SIZE, 3);
+    conf.setLong(DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS, 100);
+    conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
+        new File(sockDir.getDir(),
+          "TestRequestMmapAccess._PORT.sock").getAbsolutePath());
+    conf.setBoolean(DFSConfigKeys.
+        DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, true);
+    return conf;
+  }
+
+  @Test
+  public void testZeroCopyReads() throws Exception {
+    HdfsConfiguration conf = initZeroCopyTest();
+    MiniDFSCluster cluster = null;
+    final Path TEST_PATH = new Path("/a");
+    FSDataInputStream fsIn = null;
+    final int TEST_FILE_LENGTH = 12345;
+    
+    FileSystem fs = null;
+    try {
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+      cluster.waitActive();
+      fs = cluster.getFileSystem();
+      DFSTestUtil.createFile(fs, TEST_PATH,
+          TEST_FILE_LENGTH, (short)1, 7567L);
+      try {
+        DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
+      } catch (InterruptedException e) {
+        Assert.fail("unexpected InterruptedException during " +
+            "waitReplication: " + e);
+      } catch (TimeoutException e) {
+        Assert.fail("unexpected TimeoutException during " +
+            "waitReplication: " + e);
+      }
+      fsIn = fs.open(TEST_PATH);
+      byte original[] = new byte[TEST_FILE_LENGTH];
+      IOUtils.readFully(fsIn, original, 0, TEST_FILE_LENGTH);
+      fsIn.close();
+      fsIn = fs.open(TEST_PATH);
+      ByteBuffer result = fsIn.read(null, 4096,
+          EnumSet.of(ReadOption.SKIP_CHECKSUMS));
+      Assert.assertEquals(4096, result.remaining());
+      HdfsDataInputStream dfsIn = (HdfsDataInputStream)fsIn;
+      Assert.assertEquals(4096,
+          dfsIn.getReadStatistics().getTotalBytesRead());
+      Assert.assertEquals(4096,
+          dfsIn.getReadStatistics().getTotalZeroCopyBytesRead());
+      Assert.assertArrayEquals(Arrays.copyOfRange(original, 0, 4096),
+          byteBufferToArray(result));
+      fsIn.releaseBuffer(result);
+    } finally {
+      if (fsIn != null) fsIn.close();
+      if (fs != null) fs.close();
+      if (cluster != null) cluster.shutdown();
+    }
+  }
+  
+  @Test
+  public void testShortZeroCopyReads() throws Exception {
+    HdfsConfiguration conf = initZeroCopyTest();
+    MiniDFSCluster cluster = null;
+    final Path TEST_PATH = new Path("/a");
+    FSDataInputStream fsIn = null;
+    final int TEST_FILE_LENGTH = 12345;
+    
+    FileSystem fs = null;
+    try {
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+      cluster.waitActive();
+      fs = cluster.getFileSystem();
+      DFSTestUtil.createFile(fs, TEST_PATH, TEST_FILE_LENGTH, (short)1, 7567L);
+      try {
+        DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
+      } catch (InterruptedException e) {
+        Assert.fail("unexpected InterruptedException during " +
+            "waitReplication: " + e);
+      } catch (TimeoutException e) {
+        Assert.fail("unexpected TimeoutException during " +
+            "waitReplication: " + e);
+      }
+      fsIn = fs.open(TEST_PATH);
+      byte original[] = new byte[TEST_FILE_LENGTH];
+      IOUtils.readFully(fsIn, original, 0, TEST_FILE_LENGTH);
+      fsIn.close();
+      fsIn = fs.open(TEST_PATH);
+
+      // Try to read 8192, but only get 4096 because of the block size.
+      HdfsDataInputStream dfsIn = (HdfsDataInputStream)fsIn;
+      ByteBuffer result =
+        dfsIn.read(null, 8192, EnumSet.of(ReadOption.SKIP_CHECKSUMS));
+      Assert.assertEquals(4096, result.remaining());
+      Assert.assertEquals(4096,
+          dfsIn.getReadStatistics().getTotalBytesRead());
+      Assert.assertEquals(4096,
+          dfsIn.getReadStatistics().getTotalZeroCopyBytesRead());
+      Assert.assertArrayEquals(Arrays.copyOfRange(original, 0, 4096),
+          byteBufferToArray(result));
+      dfsIn.releaseBuffer(result);
+      
+      // Try to read 4097, but only get 4096 because of the block size.
+      result = 
+          dfsIn.read(null, 4097, EnumSet.of(ReadOption.SKIP_CHECKSUMS));
+      Assert.assertEquals(4096, result.remaining());
+      Assert.assertArrayEquals(Arrays.copyOfRange(original, 4096, 8192),
+          byteBufferToArray(result));
+      dfsIn.releaseBuffer(result);
+    } finally {
+      if (fsIn != null) fsIn.close();
+      if (fs != null) fs.close();
+      if (cluster != null) cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testZeroCopyReadsNoFallback() throws Exception {
+    HdfsConfiguration conf = initZeroCopyTest();
+    MiniDFSCluster cluster = null;
+    final Path TEST_PATH = new Path("/a");
+    FSDataInputStream fsIn = null;
+    final int TEST_FILE_LENGTH = 12345;
+    
+    FileSystem fs = null;
+    try {
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+      cluster.waitActive();
+      fs = cluster.getFileSystem();
+      DFSTestUtil.createFile(fs, TEST_PATH,
+          TEST_FILE_LENGTH, (short)1, 7567L);
+      try {
+        DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
+      } catch (InterruptedException e) {
+        Assert.fail("unexpected InterruptedException during " +
+            "waitReplication: " + e);
+      } catch (TimeoutException e) {
+        Assert.fail("unexpected TimeoutException during " +
+            "waitReplication: " + e);
+      }
+      fsIn = fs.open(TEST_PATH);
+      byte original[] = new byte[TEST_FILE_LENGTH];
+      IOUtils.readFully(fsIn, original, 0, TEST_FILE_LENGTH);
+      fsIn.close();
+      fsIn = fs.open(TEST_PATH);
+      HdfsDataInputStream dfsIn = (HdfsDataInputStream)fsIn;
+      ByteBuffer result;
+      try {
+        result = dfsIn.read(null, 4097, EnumSet.noneOf(ReadOption.class));
+        Assert.fail("expected UnsupportedOperationException");
+      } catch (UnsupportedOperationException e) {
+        // expected
+      }
+      result = dfsIn.read(null, 4096, EnumSet.of(ReadOption.SKIP_CHECKSUMS));
+      Assert.assertEquals(4096, result.remaining());
+      Assert.assertEquals(4096,
+          dfsIn.getReadStatistics().getTotalBytesRead());
+      Assert.assertEquals(4096,
+          dfsIn.getReadStatistics().getTotalZeroCopyBytesRead());
+      Assert.assertArrayEquals(Arrays.copyOfRange(original, 0, 4096),
+          byteBufferToArray(result));
+    } finally {
+      if (fsIn != null) fsIn.close();
+      if (fs != null) fs.close();
+      if (cluster != null) cluster.shutdown();
+    }
+  }
+
+  private static class CountingVisitor
+      implements ClientMmapManager.ClientMmapVisitor {
+    int count = 0;
+
+    @Override
+    public void accept(ClientMmap mmap) {
+      count++;
+    }
+
+    public void reset() {
+      count = 0;
+    }
+  }
+
+  @Test
+  public void testZeroCopyMmapCache() throws Exception {
+    HdfsConfiguration conf = initZeroCopyTest();
+    MiniDFSCluster cluster = null;
+    final Path TEST_PATH = new Path("/a");
+    final int TEST_FILE_LENGTH = 16385;
+    final int RANDOM_SEED = 23453;
+    FSDataInputStream fsIn = null;
+    ByteBuffer results[] = { null, null, null, null, null };
+    
+    DistributedFileSystem fs = null;
+    try {
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+      cluster.waitActive();
+      fs = cluster.getFileSystem();
+      DFSTestUtil.createFile(fs, TEST_PATH,
+          TEST_FILE_LENGTH, (short)1, RANDOM_SEED);
+      try {
+        DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
+      } catch (InterruptedException e) {
+        Assert.fail("unexpected InterruptedException during " +
+            "waitReplication: " + e);
+      } catch (TimeoutException e) {
+        Assert.fail("unexpected TimeoutException during " +
+            "waitReplication: " + e);
+      }
+      fsIn = fs.open(TEST_PATH);
+      byte original[] = new byte[TEST_FILE_LENGTH];
+      IOUtils.readFully(fsIn, original, 0, TEST_FILE_LENGTH);
+      fsIn.close();
+      fsIn = fs.open(TEST_PATH);
+      final ClientMmapManager mmapManager = fs.getClient().getMmapManager();
+      final CountingVisitor countingVisitor = new CountingVisitor();
+      mmapManager.visitMmaps(countingVisitor);
+      Assert.assertEquals(0, countingVisitor.count);
+      mmapManager.visitEvictable(countingVisitor);
+      Assert.assertEquals(0, countingVisitor.count);
+      results[0] = fsIn.read(null, 4096,
+          EnumSet.of(ReadOption.SKIP_CHECKSUMS));
+      fsIn.seek(0);
+      results[1] = fsIn.read(null, 4096,
+          EnumSet.of(ReadOption.SKIP_CHECKSUMS));
+      mmapManager.visitMmaps(countingVisitor);
+      Assert.assertEquals(1, countingVisitor.count);
+      countingVisitor.reset();
+      mmapManager.visitEvictable(countingVisitor);
+      Assert.assertEquals(0, countingVisitor.count);
+      countingVisitor.reset();
+
+      // The mmaps should be of the first block of the file.
+      final ExtendedBlock firstBlock = DFSTestUtil.getFirstBlock(fs, TEST_PATH);
+      mmapManager.visitMmaps(new ClientMmapManager.ClientMmapVisitor() {
+        @Override
+        public void accept(ClientMmap mmap) {
+          Assert.assertEquals(firstBlock, mmap.getBlock());
+        }
+      });
+
+      // Read more blocks.
+      results[2] = fsIn.read(null, 4096,
+          EnumSet.of(ReadOption.SKIP_CHECKSUMS));
+      results[3] = fsIn.read(null, 4096,
+          EnumSet.of(ReadOption.SKIP_CHECKSUMS));
+      try {
+        results[4] = fsIn.read(null, 4096,
+            EnumSet.of(ReadOption.SKIP_CHECKSUMS));
+        Assert.fail("expected UnsupportedOperationException");
+      } catch (UnsupportedOperationException e) {
+        // expected
+      }
+
+      // we should have 3 mmaps, 0 evictable
+      mmapManager.visitMmaps(countingVisitor);
+      Assert.assertEquals(3, countingVisitor.count);
+      countingVisitor.reset();
+      mmapManager.visitEvictable(countingVisitor);
+      Assert.assertEquals(0, countingVisitor.count);
+
+      // After we close the cursors, the mmaps should be evictable for 
+      // a brief period of time.  Then, they should be closed (we're 
+      // using a very quick timeout)
+      for (ByteBuffer buffer : results) {
+        if (buffer != null) {
+          fsIn.releaseBuffer(buffer);
+        }
+      }
+      GenericTestUtils.waitFor(new Supplier<Boolean>() {
+        public Boolean get() {
+          countingVisitor.reset();
+          try {
+            mmapManager.visitEvictable(countingVisitor);
+          } catch (InterruptedException e) {
+            e.printStackTrace();
+            return false;
+          }
+          return (0 == countingVisitor.count);
+        }
+      }, 10, 10000);
+      countingVisitor.reset();
+      mmapManager.visitMmaps(countingVisitor);
+      Assert.assertEquals(0, countingVisitor.count);
+    } finally {
+      if (fsIn != null) fsIn.close();
+      if (fs != null) fs.close();
+      if (cluster != null) cluster.shutdown();
+    }
+  }
+
+  /**
+   * Test HDFS fallback reads.  HDFS streams support the ByteBufferReadable
+   * interface.
+   */
+  @Test
+  public void testHdfsFallbackReads() throws Exception {
+    HdfsConfiguration conf = initZeroCopyTest();
+    MiniDFSCluster cluster = null;
+    final Path TEST_PATH = new Path("/a");
+    final int TEST_FILE_LENGTH = 16385;
+    final int RANDOM_SEED = 23453;
+    FSDataInputStream fsIn = null;
+    
+    DistributedFileSystem fs = null;
+    try {
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+      cluster.waitActive();
+      fs = cluster.getFileSystem();
+      DFSTestUtil.createFile(fs, TEST_PATH,
+          TEST_FILE_LENGTH, (short)1, RANDOM_SEED);
+      try {
+        DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
+      } catch (InterruptedException e) {
+        Assert.fail("unexpected InterruptedException during " +
+            "waitReplication: " + e);
+      } catch (TimeoutException e) {
+        Assert.fail("unexpected TimeoutException during " +
+            "waitReplication: " + e);
+      }
+      fsIn = fs.open(TEST_PATH);
+      byte original[] = new byte[TEST_FILE_LENGTH];
+      IOUtils.readFully(fsIn, original, 0, TEST_FILE_LENGTH);
+      fsIn.close();
+      fsIn = fs.open(TEST_PATH);
+      testFallbackImpl(fsIn, original);
+    } finally {
+      if (fsIn != null) fsIn.close();
+      if (fs != null) fs.close();
+      if (cluster != null) cluster.shutdown();
+    }
+  }
+
+  private static class RestrictedAllocatingByteBufferPool
+      implements ByteBufferPool {
+    private final boolean direct;
+
+    RestrictedAllocatingByteBufferPool(boolean direct) {
+      this.direct = direct;
+    }
+    @Override
+    public ByteBuffer getBuffer(boolean direct, int length) {
+      Preconditions.checkArgument(this.direct == direct);
+      return direct ? ByteBuffer.allocateDirect(length) :
+        ByteBuffer.allocate(length);
+    }
+    @Override
+    public void putBuffer(ByteBuffer buffer) {
+    }
+  }
+  
+  private static void testFallbackImpl(InputStream stream,
+      byte original[]) throws Exception {
+    RestrictedAllocatingByteBufferPool bufferPool =
+        new RestrictedAllocatingByteBufferPool(
+            stream instanceof ByteBufferReadable);
+
+    ByteBuffer result = ByteBufferUtil.fallbackRead(stream, bufferPool, 10);
+    Assert.assertEquals(10, result.remaining());
+    Assert.assertArrayEquals(Arrays.copyOfRange(original, 0, 10),
+        byteBufferToArray(result));
+
+    result = ByteBufferUtil.fallbackRead(stream, bufferPool, 5000);
+    Assert.assertEquals(5000, result.remaining());
+    Assert.assertArrayEquals(Arrays.copyOfRange(original, 10, 5010),
+        byteBufferToArray(result));
+
+    result = ByteBufferUtil.fallbackRead(stream, bufferPool, 9999999);
+    Assert.assertEquals(11375, result.remaining());
+    Assert.assertArrayEquals(Arrays.copyOfRange(original, 5010, 16385),
+        byteBufferToArray(result));
+
+    result = ByteBufferUtil.fallbackRead(stream, bufferPool, 10);
+    Assert.assertNull(result);
+  }
+
+  /**
+   * Test the {@link ByteBufferUtil#fallbackRead} function directly.
+   */
+  @Test
+  public void testFallbackRead() throws Exception {
+    HdfsConfiguration conf = initZeroCopyTest();
+    MiniDFSCluster cluster = null;
+    final Path TEST_PATH = new Path("/a");
+    final int TEST_FILE_LENGTH = 16385;
+    final int RANDOM_SEED = 23453;
+    FSDataInputStream fsIn = null;
+    
+    DistributedFileSystem fs = null;
+    try {
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+      cluster.waitActive();
+      fs = cluster.getFileSystem();
+      DFSTestUtil.createFile(fs, TEST_PATH,
+          TEST_FILE_LENGTH, (short)1, RANDOM_SEED);
+      try {
+        DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
+      } catch (InterruptedException e) {
+        Assert.fail("unexpected InterruptedException during " +
+            "waitReplication: " + e);
+      } catch (TimeoutException e) {
+        Assert.fail("unexpected TimeoutException during " +
+            "waitReplication: " + e);
+      }
+      fsIn = fs.open(TEST_PATH);
+      byte original[] = new byte[TEST_FILE_LENGTH];
+      IOUtils.readFully(fsIn, original, 0, TEST_FILE_LENGTH);
+      fsIn.close();
+      fsIn = fs.open(TEST_PATH);
+      testFallbackImpl(fsIn, original);
+    } finally {
+      if (fsIn != null) fsIn.close();
+      if (fs != null) fs.close();
+      if (cluster != null) cluster.shutdown();
+    }
+  }
+
+  /**
+   * Test fallback reads on a stream which does not support the
+   * ByteBufferReadable * interface.
+   */
+  @Test
+  public void testIndirectFallbackReads() throws Exception {
+    final File TEST_DIR = new File(
+      System.getProperty("test.build.data","build/test/data"));
+    final String TEST_PATH = TEST_DIR + File.separator +
+        "indirectFallbackTestFile";
+    final int TEST_FILE_LENGTH = 16385;
+    final int RANDOM_SEED = 23453;
+    FileOutputStream fos = null;
+    FileInputStream fis = null;
+    try {
+      fos = new FileOutputStream(TEST_PATH);
+      Random random = new Random(RANDOM_SEED);
+      byte original[] = new byte[TEST_FILE_LENGTH];
+      random.nextBytes(original);
+      fos.write(original);
+      fos.close();
+      fos = null;
+      fis = new FileInputStream(TEST_PATH);
+      testFallbackImpl(fis, original);
+    } finally {
+      IOUtils.cleanup(LOG, fos, fis);
+      new File(TEST_PATH).delete();
+    }
+  }
+}