You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by cm...@apache.org on 2013/09/24 23:40:54 UTC
svn commit: r1526020 [1/2] - in
/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs: ./
src/main/java/org/apache/hadoop/hdfs/
src/main/java/org/apache/hadoop/hdfs/client/ src/main/native/libhdfs/
src/main/native/libhdfs/test/ src/test/jav...
Author: cmccabe
Date: Tue Sep 24 21:40:53 2013
New Revision: 1526020
URL: http://svn.apache.org/r1526020
Log:
HDFS-5191. Revisit zero-copy API in FSDataInputStream to make it more intuitive (Contributed by Colin Patrick McCabe)
Added:
hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java
Removed:
hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HdfsZeroCopyCursor.java
Modified:
hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-4949.txt
hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java
hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java
hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ClientMmapManager.java
hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/expect.c
hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.c
hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.h
hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/jni_helper.c
hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/jni_helper.h
hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/test/test_libhdfs_zerocopy.c
hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java
Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-4949.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-4949.txt?rev=1526020&r1=1526019&r2=1526020&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-4949.txt (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-4949.txt Tue Sep 24 21:40:53 2013
@@ -45,6 +45,9 @@ HDFS-4949 (Unreleased)
HDFS-5236. Change PathBasedCacheDirective APIs to be a single value
rather than batch. (Contributed by Andrew Wang)
+ HDFS-5191. Revisit zero-copy API in FSDataInputStream to make it more
+ intuitive. (Contributed by Colin Patrick McCabe)
+
OPTIMIZATIONS
BUG FIXES
Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java?rev=1526020&r1=1526019&r2=1526020&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java Tue Sep 24 21:40:53 2013
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs;
import java.io.IOException;
import org.apache.hadoop.fs.ByteBufferReadable;
+import org.apache.hadoop.hdfs.client.ClientMmap;
import org.apache.hadoop.hdfs.client.ClientMmapManager;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@@ -28,6 +29,7 @@ import org.apache.hadoop.hdfs.protocol.L
* from a single datanode.
*/
public interface BlockReader extends ByteBufferReadable {
+
/* same interface as inputStream java.io.InputStream#read()
* used by DFSInputStream#read()
@@ -85,19 +87,12 @@ public interface BlockReader extends Byt
boolean isShortCircuit();
/**
- * Do a zero-copy read with the current block reader.
- *
- * We assume that the calling code has done bounds checking, and won't ask
- * us for more bytes than are supposed to be visible (or are in the file).
+ * Get a ClientMmap object for this BlockReader.
*
- * @param buffers The zero-copy buffers object.
* @param curBlock The current block.
- * @param blockPos Position in the current block to start reading at.
- * @param toRead The number of bytes to read from the block.
- *
- * @return true if the read was done, false otherwise.
+ * @return The ClientMmap object, or null if mmap is not
+ * supported.
*/
- boolean readZeroCopy(HdfsZeroCopyCursor buffers,
- LocatedBlock curBlock, long blockPos, int toRead,
+ ClientMmap getClientMmap(LocatedBlock curBlock,
ClientMmapManager mmapManager);
}
Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java?rev=1526020&r1=1526019&r2=1526020&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java Tue Sep 24 21:40:53 2013
@@ -548,46 +548,28 @@ class BlockReaderLocal implements BlockR
}
@Override
- public boolean readZeroCopy(HdfsZeroCopyCursor cursor,
- LocatedBlock curBlock, long blockPos, int toRead,
- ClientMmapManager mmapManager) {
+ public ClientMmap getClientMmap(LocatedBlock curBlock,
+ ClientMmapManager mmapManager) {
if (clientMmap == null) {
if (mmapDisabled) {
- return false;
+ return null;
}
try {
clientMmap = mmapManager.fetch(datanodeID, block, dataIn);
if (clientMmap == null) {
mmapDisabled = true;
- return false;
+ return null;
}
} catch (InterruptedException e) {
LOG.error("Interrupted while setting up mmap for " + filename, e);
Thread.currentThread().interrupt();
- return false;
+ return null;
} catch (IOException e) {
LOG.error("unable to set up mmap for " + filename, e);
mmapDisabled = true;
- return false;
+ return null;
}
}
- long limit = blockPos + toRead;
- if (limit > Integer.MAX_VALUE) {
- /*
- * In Java, ByteBuffers use a 32-bit length, capacity, offset, etc.
- * This limits our mmap'ed regions to 2 GB in length.
- * TODO: we can implement zero-copy for larger blocks by doing multiple
- * mmaps.
- */
- mmapDisabled = true;
- clientMmap.unref();
- clientMmap = null;
- return false;
- }
- ByteBuffer mmapBuffer = clientMmap.getMappedByteBuffer().duplicate();
- mmapBuffer.position((int)blockPos);
- mmapBuffer.limit((int)limit);
- cursor.setMmap(clientMmap, mmapBuffer);
- return true;
+ return clientMmap;
}
}
Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java?rev=1526020&r1=1526019&r2=1526020&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java Tue Sep 24 21:40:53 2013
@@ -28,6 +28,7 @@ import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
+import org.apache.hadoop.hdfs.client.ClientMmap;
import org.apache.hadoop.hdfs.client.ClientMmapManager;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -705,9 +706,8 @@ class BlockReaderLocalLegacy implements
}
@Override
- public boolean readZeroCopy(HdfsZeroCopyCursor buffers,
- LocatedBlock curBlock, long blockPos, int toRead,
+ public ClientMmap getClientMmap(LocatedBlock curBlock,
ClientMmapManager mmapManager) {
- return false;
+ return null;
}
}
Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1526020&r1=1526019&r2=1526020&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java Tue Sep 24 21:40:53 2013
@@ -2610,7 +2610,8 @@ public class DFSClient implements java.i
return defaultWriteCachingStrategy;
}
- ClientMmapManager getMmapManager() {
+ @VisibleForTesting
+ public ClientMmapManager getMmapManager() {
return mmapManager;
}
}
Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java?rev=1526020&r1=1526019&r2=1526020&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java Tue Sep 24 21:40:53 2013
@@ -24,6 +24,7 @@ import java.net.Socket;
import java.nio.ByteBuffer;
import java.util.AbstractMap;
import java.util.ArrayList;
+import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -36,12 +37,15 @@ import java.util.concurrent.ConcurrentHa
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.ByteBufferReadable;
+import org.apache.hadoop.fs.ByteBufferUtil;
import org.apache.hadoop.fs.CanSetDropBehind;
import org.apache.hadoop.fs.CanSetReadahead;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.FSInputStream;
+import org.apache.hadoop.fs.HasEnhancedByteBufferAccess;
+import org.apache.hadoop.fs.ReadOption;
import org.apache.hadoop.fs.UnresolvedLinkException;
-import org.apache.hadoop.fs.ZeroCopyCursor;
+import org.apache.hadoop.hdfs.client.ClientMmap;
import org.apache.hadoop.hdfs.net.DomainPeer;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.net.TcpPeerServer;
@@ -55,12 +59,14 @@ import org.apache.hadoop.hdfs.security.t
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
+import org.apache.hadoop.io.ByteBufferPool;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.unix.DomainSocket;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.IdentityHashStore;
import com.google.common.annotations.VisibleForTesting;
@@ -70,7 +76,8 @@ import com.google.common.annotations.Vis
****************************************************************/
@InterfaceAudience.Private
public class DFSInputStream extends FSInputStream
-implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead {
+implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
+ HasEnhancedByteBufferAccess {
@VisibleForTesting
static boolean tcpReadsDisabledForTesting = false;
private final PeerCache peerCache;
@@ -88,6 +95,15 @@ implements ByteBufferReadable, CanSetDro
private CachingStrategy cachingStrategy;
private final ReadStatistics readStatistics = new ReadStatistics();
+ /**
+ * Track the ByteBuffers that we have handed out to readers.
+ *
+ * The value type can be either ByteBufferPool or ClientMmap, depending on
+ * whether we this is a memory-mapped buffer or not.
+ */
+ private final IdentityHashStore<ByteBuffer, Object>
+ extendedReadBuffers = new IdentityHashStore<ByteBuffer, Object>(0);
+
public static class ReadStatistics {
public ReadStatistics() {
this.totalBytesRead = 0;
@@ -606,6 +622,20 @@ implements ByteBufferReadable, CanSetDro
}
dfsClient.checkOpen();
+ if (!extendedReadBuffers.isEmpty()) {
+ final StringBuilder builder = new StringBuilder();
+ extendedReadBuffers.visitAll(new IdentityHashStore.Visitor<ByteBuffer, Object>() {
+ private String prefix = "";
+ @Override
+ public void accept(ByteBuffer k, Object v) {
+ builder.append(prefix).append(k);
+ prefix = ", ";
+ }
+ });
+ DFSClient.LOG.warn("closing file " + src + ", but there are still " +
+ "unreleased ByteBuffers allocated by read(). " +
+ "Please release " + builder.toString() + ".");
+ }
if (blockReader != null) {
blockReader.close();
blockReader = null;
@@ -1413,9 +1443,11 @@ implements ByteBufferReadable, CanSetDro
closeCurrentBlockReader();
}
- synchronized void readZeroCopy(HdfsZeroCopyCursor zcursor, int toRead)
- throws IOException {
- assert(toRead > 0);
+ @Override
+ public synchronized ByteBuffer read(ByteBufferPool bufferPool,
+ int maxLength, EnumSet<ReadOption> opts)
+ throws IOException, UnsupportedOperationException {
+ assert(maxLength > 0);
if (((blockReader == null) || (blockEnd == -1)) &&
(pos < getFileLength())) {
/*
@@ -1429,50 +1461,81 @@ implements ByteBufferReadable, CanSetDro
"at position " + pos);
}
}
+ boolean canSkipChecksums = opts.contains(ReadOption.SKIP_CHECKSUMS);
+ if (canSkipChecksums) {
+ ByteBuffer buffer = tryReadZeroCopy(maxLength);
+ if (buffer != null) {
+ return buffer;
+ }
+ }
+ ByteBuffer buffer = ByteBufferUtil.
+ fallbackRead(this, bufferPool, maxLength);
+ if (buffer != null) {
+ extendedReadBuffers.put(buffer, bufferPool);
+ }
+ return buffer;
+ }
+
+ private synchronized ByteBuffer tryReadZeroCopy(int maxLength)
+ throws IOException {
+ // Java ByteBuffers can't be longer than 2 GB, because they use
+ // 4-byte signed integers to represent capacity, etc.
+ // So we can't mmap the parts of the block higher than the 2 GB offset.
+ // FIXME: we could work around this with multiple memory maps.
+ // See HDFS-5101.
+ long blockEnd32 = Math.min(Integer.MAX_VALUE, blockEnd);
long curPos = pos;
- boolean canSkipChecksums = zcursor.getSkipChecksums();
- long blockLeft = blockEnd - curPos + 1;
- if (zcursor.getAllowShortReads()) {
- if (blockLeft < toRead) {
- toRead = (int)blockLeft;
+ long blockLeft = blockEnd32 - curPos + 1;
+ if (blockLeft <= 0) {
+ if (DFSClient.LOG.isDebugEnabled()) {
+ DFSClient.LOG.debug("unable to perform a zero-copy read from offset " +
+ curPos + " of " + src + "; blockLeft = " + blockLeft +
+ "; blockEnd32 = " + blockEnd32 + ", blockEnd = " + blockEnd +
+ "; maxLength = " + maxLength);
}
+ return null;
}
- if (canSkipChecksums && (toRead <= blockLeft)) {
- long blockStartInFile = currentLocatedBlock.getStartOffset();
- long blockPos = curPos - blockStartInFile;
- if (blockReader.readZeroCopy(zcursor,
- currentLocatedBlock, blockPos, toRead,
- dfsClient.getMmapManager())) {
- if (DFSClient.LOG.isDebugEnabled()) {
- DFSClient.LOG.debug("readZeroCopy read " + toRead + " bytes from " +
- "offset " + curPos + " via the zero-copy read path. " +
- "blockEnd = " + blockEnd);
- }
- readStatistics.addZeroCopyBytes(toRead);
- seek(pos + toRead);
- return;
+ int length = Math.min((int)blockLeft, maxLength);
+ long blockStartInFile = currentLocatedBlock.getStartOffset();
+ long blockPos = curPos - blockStartInFile;
+ long limit = blockPos + length;
+ ClientMmap clientMmap =
+ blockReader.getClientMmap(currentLocatedBlock,
+ dfsClient.getMmapManager());
+ if (clientMmap == null) {
+ if (DFSClient.LOG.isDebugEnabled()) {
+ DFSClient.LOG.debug("unable to perform a zero-copy read from offset " +
+ curPos + " of " + src + "; BlockReader#getClientMmap returned " +
+ "null.");
}
+ return null;
}
- /*
- * Slow path reads.
- *
- * readStatistics will be updated when we call back into this
- * stream's read methods.
- */
- long prevBlockEnd = blockEnd;
- int slowReadAmount = zcursor.readViaSlowPath(toRead);
+ seek(pos + length);
+ ByteBuffer buffer = clientMmap.getMappedByteBuffer().asReadOnlyBuffer();
+ buffer.position((int)blockPos);
+ buffer.limit((int)limit);
+ clientMmap.ref();
+ extendedReadBuffers.put(buffer, clientMmap);
+ readStatistics.addZeroCopyBytes(length);
if (DFSClient.LOG.isDebugEnabled()) {
- DFSClient.LOG.debug("readZeroCopy read " + slowReadAmount + " bytes " +
- "from offset " + curPos + " via the fallback read path. " +
- "prevBlockEnd = " + prevBlockEnd + ", blockEnd = " + blockEnd +
- ", canSkipChecksums = " + canSkipChecksums);
+ DFSClient.LOG.debug("readZeroCopy read " + maxLength + " bytes from " +
+ "offset " + curPos + " via the zero-copy read path. " +
+ "blockEnd = " + blockEnd);
}
+ return buffer;
}
@Override
- public ZeroCopyCursor createZeroCopyCursor()
- throws IOException, UnsupportedOperationException {
- return new HdfsZeroCopyCursor(this,
- dfsClient.getConf().skipShortCircuitChecksums);
+ public synchronized void releaseBuffer(ByteBuffer buffer) {
+ Object val = extendedReadBuffers.remove(buffer);
+ if (val == null) {
+ throw new IllegalArgumentException("tried to release a buffer " +
+ "that was not created by this stream, " + buffer);
+ }
+ if (val instanceof ClientMmap) {
+ ((ClientMmap)val).unref();
+ } else if (val instanceof ByteBufferPool) {
+ ((ByteBufferPool)val).putBuffer(buffer);
+ }
}
}
Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java?rev=1526020&r1=1526019&r2=1526020&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java Tue Sep 24 21:40:53 2013
@@ -27,6 +27,7 @@ import java.nio.ByteBuffer;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.FSInputChecker;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.client.ClientMmap;
import org.apache.hadoop.hdfs.client.ClientMmapManager;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
@@ -489,9 +490,8 @@ public class RemoteBlockReader extends F
}
@Override
- public boolean readZeroCopy(HdfsZeroCopyCursor buffers,
- LocatedBlock curBlock, long blockPos, int toRead,
+ public ClientMmap getClientMmap(LocatedBlock curBlock,
ClientMmapManager mmapManager) {
- return false;
+ return null;
}
}
Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java?rev=1526020&r1=1526019&r2=1526020&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java Tue Sep 24 21:40:53 2013
@@ -29,6 +29,7 @@ import java.nio.channels.ReadableByteCha
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.client.ClientMmap;
import org.apache.hadoop.hdfs.client.ClientMmapManager;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
@@ -454,9 +455,8 @@ public class RemoteBlockReader2 impleme
}
@Override
- public boolean readZeroCopy(HdfsZeroCopyCursor buffers,
- LocatedBlock curBlock, long blockPos, int toRead,
+ public ClientMmap getClientMmap(LocatedBlock curBlock,
ClientMmapManager manager) {
- return false;
+ return null;
}
}
Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ClientMmapManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ClientMmapManager.java?rev=1526020&r1=1526019&r2=1526020&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ClientMmapManager.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ClientMmapManager.java Tue Sep 24 21:40:53 2013
@@ -361,6 +361,10 @@ public class ClientMmapManager implement
}
waitable.provide(mmap);
}
+ if (LOG.isDebugEnabled()) {
+ LOG.info("created a new ClientMmap for block " + key.block +
+ " on datanode " + key.datanode);
+ }
return mmap;
}
@@ -403,8 +407,10 @@ public class ClientMmapManager implement
finally {
lock.unlock();
}
- LOG.debug("reusing existing mmap with datanodeID=" + datanodeID +
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("reusing existing mmap with datanodeID=" + datanodeID +
", " + "block=" + block);
+ }
return mmap;
}
Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/expect.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/expect.c?rev=1526020&r1=1526019&r2=1526020&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/expect.c (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/expect.c Tue Sep 24 21:40:53 2013
@@ -32,6 +32,22 @@ int expectFileStats(hdfsFile file,
{
struct hdfsReadStatistics *stats = NULL;
EXPECT_ZERO(hdfsFileGetReadStatistics(file, &stats));
+ fprintf(stderr, "expectFileStats(expectedTotalBytesRead=%"PRId64", "
+ "expectedTotalLocalBytesRead=%"PRId64", "
+ "expectedTotalShortCircuitBytesRead=%"PRId64", "
+ "expectedTotalZeroCopyBytesRead=%"PRId64", "
+ "totalBytesRead=%"PRId64", "
+ "totalLocalBytesRead=%"PRId64", "
+ "totalShortCircuitBytesRead=%"PRId64", "
+ "totalZeroCopyBytesRead=%"PRId64")\n",
+ expectedTotalBytesRead,
+ expectedTotalLocalBytesRead,
+ expectedTotalShortCircuitBytesRead,
+ expectedTotalZeroCopyBytesRead,
+ stats->totalBytesRead,
+ stats->totalLocalBytesRead,
+ stats->totalShortCircuitBytesRead,
+ stats->totalZeroCopyBytesRead);
if (expectedTotalBytesRead != UINT64_MAX) {
EXPECT_INT64_EQ(expectedTotalBytesRead, stats->totalBytesRead);
}
Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.c?rev=1526020&r1=1526019&r2=1526020&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.c (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.c Tue Sep 24 21:40:53 2013
@@ -39,7 +39,7 @@
#define JAVA_NET_ISA "java/net/InetSocketAddress"
#define JAVA_NET_URI "java/net/URI"
#define JAVA_STRING "java/lang/String"
-#define HADOOP_ZERO_COPY_CURSOR "org/apache/hadoop/fs/ZeroCopyCursor"
+#define READ_OPTION "org/apache/hadoop/fs/ReadOption"
#define JAVA_VOID "V"
@@ -2103,151 +2103,258 @@ int hdfsUtime(hdfsFS fs, const char* pat
return 0;
}
-struct hadoopZeroCopyCursor* hadoopZeroCopyCursorAlloc(hdfsFile file)
+/**
+ * Zero-copy options.
+ *
+ * We cache the EnumSet of ReadOptions which has to be passed into every
+ * readZero call, to avoid reconstructing it each time. This cache is cleared
+ * whenever an element changes.
+ */
+struct hadoopRzOptions
{
- int ret;
- jobject zcursor = NULL;
- jvalue jVal;
- jthrowable jthr;
- JNIEnv* env;
+ JNIEnv *env;
+ int skipChecksums;
+ jobject byteBufferPool;
+ jobject cachedEnumSet;
+};
+
+struct hadoopRzOptions *hadoopRzOptionsAlloc(void)
+{
+ struct hadoopRzOptions *opts;
+ JNIEnv *env;
env = getJNIEnv();
- if (env == NULL) {
+ if (!env) {
+ // Check to make sure the JNI environment is set up properly.
errno = EINTERNAL;
return NULL;
}
- if (file->type != INPUT) {
- ret = EINVAL;
- goto done;
- }
- jthr = invokeMethod(env, &jVal, INSTANCE, (jobject)file->file, HADOOP_ISTRM,
- "createZeroCopyCursor", "()L"HADOOP_ZERO_COPY_CURSOR";");
- if (jthr) {
- ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
- "hadoopZeroCopyCursorAlloc: createZeroCopyCursor");
- goto done;
- }
- zcursor = (*env)->NewGlobalRef(env, jVal.l);
- if (!zcursor) {
- ret = printPendingExceptionAndFree(env, PRINT_EXC_ALL,
- "hadoopZeroCopyCursorAlloc: NewGlobalRef");
- }
- ret = 0;
-done:
- if (ret) {
- errno = ret;
+ opts = calloc(1, sizeof(struct hadoopRzOptions));
+ if (!opts) {
+ errno = ENOMEM;
+ return NULL;
}
- return (struct hadoopZeroCopyCursor*)zcursor;
+ return opts;
}
-int hadoopZeroCopyCursorSetFallbackBuffer(struct hadoopZeroCopyCursor* zcursor,
- void *cbuf, uint32_t size)
+static void hadoopRzOptionsClearCached(JNIEnv *env,
+ struct hadoopRzOptions *opts)
{
- int ret;
- jobject buffer = NULL;
- jthrowable jthr;
- JNIEnv* env;
+ if (!opts->cachedEnumSet) {
+ return;
+ }
+ (*env)->DeleteGlobalRef(env, opts->cachedEnumSet);
+ opts->cachedEnumSet = NULL;
+}
+int hadoopRzOptionsSetSkipChecksum(
+ struct hadoopRzOptions *opts, int skip)
+{
+ JNIEnv *env;
env = getJNIEnv();
- if (env == NULL) {
+ if (!env) {
errno = EINTERNAL;
return -1;
}
- buffer = (*env)->NewDirectByteBuffer(env, cbuf, size);
- if (!buffer) {
- ret = printPendingExceptionAndFree(env, PRINT_EXC_ALL,
- "hadoopZeroCopyCursorSetFallbackBuffer: NewDirectByteBuffer("
- "size=%"PRId32"):", size);
- goto done;
- }
- jthr = invokeMethod(env, NULL, INSTANCE, (jobject)zcursor,
- HADOOP_ZERO_COPY_CURSOR, "setFallbackBuffer",
- "(Ljava/nio/ByteBuffer;)V", buffer);
- if (jthr) {
- ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
- "hadoopZeroCopyCursorSetFallbackBuffer: "
- "FileSystem#setFallbackBuffer");
- goto done;
- }
- ret = 0;
-done:
- if (ret) {
- (*env)->DeleteLocalRef(env, buffer);
- errno = ret;
- return -1;
- }
+ hadoopRzOptionsClearCached(env, opts);
+ opts->skipChecksums = !!skip;
return 0;
}
-int hadoopZeroCopyCursorSetSkipChecksums(struct hadoopZeroCopyCursor* zcursor,
- int skipChecksums)
+int hadoopRzOptionsSetByteBufferPool(
+ struct hadoopRzOptions *opts, const char *className)
{
- JNIEnv* env;
+ JNIEnv *env;
jthrowable jthr;
- jboolean shouldSkipChecksums = skipChecksums ? JNI_TRUE : JNI_FALSE;
+ jobject byteBufferPool = NULL;
env = getJNIEnv();
- if (env == NULL) {
+ if (!env) {
errno = EINTERNAL;
return -1;
}
- jthr = invokeMethod(env, NULL, INSTANCE, (jobject)zcursor,
- HADOOP_ZERO_COPY_CURSOR, "setSkipChecksums", "(Z)V",
- shouldSkipChecksums);
+
+ // Note: we don't have to call hadoopRzOptionsClearCached in this
+ // function, since the ByteBufferPool is passed separately from the
+ // EnumSet of ReadOptions.
+
+ jthr = constructNewObjectOfClass(env, &byteBufferPool, className, "()V");
if (jthr) {
- errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
- "hadoopZeroCopyCursorSetSkipChecksums(): setSkipChecksums failed");
+ printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+ "hadoopRzOptionsSetByteBufferPool(className=%s): ", className);
+ errno = EINVAL;
return -1;
}
+ if (opts->byteBufferPool) {
+ // Delete any previous ByteBufferPool we had.
+ (*env)->DeleteGlobalRef(env, opts->byteBufferPool);
+ }
+ opts->byteBufferPool = byteBufferPool;
return 0;
}
-int hadoopZeroCopyCursorSetAllowShortReads(
- struct hadoopZeroCopyCursor* zcursor, int allowShort)
+void hadoopRzOptionsFree(struct hadoopRzOptions *opts)
{
- JNIEnv* env;
- jthrowable jthr;
- jboolean shouldAllowShort = allowShort ? JNI_TRUE : JNI_FALSE;
-
+ JNIEnv *env;
env = getJNIEnv();
- if (env == NULL) {
- errno = EINTERNAL;
- return -1;
+ if (!env) {
+ return;
}
- jthr = invokeMethod(env, NULL, INSTANCE, (jobject)zcursor,
- HADOOP_ZERO_COPY_CURSOR, "setAllowShortReads", "(Z)V",
- shouldAllowShort);
- if (jthr) {
- errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
- "hadoopZeroCopyCursorSetAllowShortReads(): setAllowShortReads "
- "failed");
- return -1;
+ hadoopRzOptionsClearCached(env, opts);
+ if (opts->byteBufferPool) {
+ (*env)->DeleteGlobalRef(env, opts->byteBufferPool);
+ opts->byteBufferPool = NULL;
}
- return 0;
+ free(opts);
}
-void hadoopZeroCopyCursorFree(struct hadoopZeroCopyCursor *zcursor)
+struct hadoopRzBuffer
{
- JNIEnv* env;
+ jobject byteBuffer;
+ uint8_t *ptr;
+ int32_t length;
+ int direct;
+};
+
+static jthrowable hadoopRzOptionsGetEnumSet(JNIEnv *env,
+ struct hadoopRzOptions *opts, jobject *enumSet)
+{
+ jthrowable jthr = NULL;
+ jobject enumInst = NULL, enumSetObj = NULL;
+ jvalue jVal;
+
+ if (opts->cachedEnumSet) {
+ // If we cached the value, return it now.
+ *enumSet = opts->cachedEnumSet;
+ goto done;
+ }
+ if (opts->skipChecksums) {
+ jthr = fetchEnumInstance(env, READ_OPTION,
+ "SKIP_CHECKSUMS", &enumInst);
+ if (jthr) {
+ goto done;
+ }
+ jthr = invokeMethod(env, &jVal, STATIC, NULL,
+ "java/util/EnumSet", "of",
+ "(Ljava/lang/Enum;)Ljava/util/EnumSet;", enumInst);
+ if (jthr) {
+ goto done;
+ }
+ enumSetObj = jVal.l;
+ } else {
+ jclass clazz = (*env)->FindClass(env, READ_OPTION);
+ if (!clazz) {
+ jthr = newRuntimeError(env, "failed "
+ "to find class for %s", READ_OPTION);
+ goto done;
+ }
+ jthr = invokeMethod(env, &jVal, STATIC, NULL,
+ "java/util/EnumSet", "noneOf",
+ "(Ljava/lang/Class;)Ljava/util/EnumSet;", clazz);
+ enumSetObj = jVal.l;
+ }
+ // create global ref
+ opts->cachedEnumSet = (*env)->NewGlobalRef(env, enumSetObj);
+ if (!opts->cachedEnumSet) {
+ jthr = getPendingExceptionAndClear(env);
+ goto done;
+ }
+ *enumSet = opts->cachedEnumSet;
+ jthr = NULL;
+done:
+ (*env)->DeleteLocalRef(env, enumInst);
+ (*env)->DeleteLocalRef(env, enumSetObj);
+ return jthr;
+}
+
+static int hadoopReadZeroExtractBuffer(JNIEnv *env,
+ const struct hadoopRzOptions *opts, struct hadoopRzBuffer *buffer)
+{
+ int ret;
jthrowable jthr;
+ jvalue jVal;
+ uint8_t *directStart;
+ void *mallocBuf = NULL;
+ jint position;
+ jarray array = NULL;
- env = getJNIEnv();
- if (env == NULL) {
- return;
+ jthr = invokeMethod(env, &jVal, INSTANCE, buffer->byteBuffer,
+ "java/nio/ByteBuffer", "remaining", "()I");
+ if (jthr) {
+ ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+ "hadoopReadZeroExtractBuffer: ByteBuffer#remaining failed: ");
+ goto done;
}
- jthr = invokeMethod(env, NULL, INSTANCE, (jobject)zcursor,
- HADOOP_ZERO_COPY_CURSOR, "close", "()V");
+ buffer->length = jVal.i;
+ jthr = invokeMethod(env, &jVal, INSTANCE, buffer->byteBuffer,
+ "java/nio/ByteBuffer", "position", "()I");
if (jthr) {
- printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
- "hadoopZeroCopyCursorFree(): close failed");
+ ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+ "hadoopReadZeroExtractBuffer: ByteBuffer#position failed: ");
+ goto done;
+ }
+ position = jVal.i;
+ directStart = (*env)->GetDirectBufferAddress(env, buffer->byteBuffer);
+ if (directStart) {
+ // Handle direct buffers.
+ buffer->ptr = directStart + position;
+ buffer->direct = 1;
+ ret = 0;
+ goto done;
+ }
+ // Handle indirect buffers.
+ // The JNI docs don't say that GetDirectBufferAddress throws any exceptions
+ // when it fails. However, they also don't clearly say that it doesn't. It
+ // seems safest to clear any pending exceptions here, to prevent problems on
+ // various JVMs.
+ (*env)->ExceptionClear(env);
+ if (!opts->byteBufferPool) {
+ fputs("hadoopReadZeroExtractBuffer: we read through the "
+ "zero-copy path, but failed to get the address of the buffer via "
+ "GetDirectBufferAddress. Please make sure your JVM supports "
+ "GetDirectBufferAddress.\n", stderr);
+ ret = ENOTSUP;
+ goto done;
+ }
+ // Get the backing array object of this buffer.
+ jthr = invokeMethod(env, &jVal, INSTANCE, buffer->byteBuffer,
+ "java/nio/ByteBuffer", "array", "()[B");
+ if (jthr) {
+ ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+ "hadoopReadZeroExtractBuffer: ByteBuffer#array failed: ");
+ goto done;
+ }
+ array = jVal.l;
+ if (!array) {
+ fputs("hadoopReadZeroExtractBuffer: ByteBuffer#array returned NULL.",
+ stderr);
+ ret = EIO;
+ goto done;
+ }
+ mallocBuf = malloc(buffer->length);
+ if (!mallocBuf) {
+ fprintf(stderr, "hadoopReadZeroExtractBuffer: failed to allocate %d bytes of memory\n",
+ buffer->length);
+ ret = ENOMEM;
+ goto done;
+ }
+ (*env)->GetByteArrayRegion(env, array, position, buffer->length, mallocBuf);
+ jthr = (*env)->ExceptionOccurred(env);
+ if (jthr) {
+ ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+ "hadoopReadZeroExtractBuffer: GetByteArrayRegion failed: ");
+ goto done;
}
- (*env)->DeleteGlobalRef(env, (jobject)zcursor);
+ buffer->ptr = mallocBuf;
+ buffer->direct = 0;
+ ret = 0;
+
+done:
+ free(mallocBuf);
+ (*env)->DeleteLocalRef(env, array);
+ return ret;
}
-/**
- * Translate an exception from ZeroCopyCursor#read, translate it into a return
- * code.
- */
static int translateZCRException(JNIEnv *env, jthrowable exc)
{
int ret;
@@ -2255,16 +2362,12 @@ static int translateZCRException(JNIEnv
jthrowable jthr = classNameOfObject(exc, env, &className);
if (jthr) {
- fprintf(stderr, "hadoopZeroCopyRead: unknown "
- "exception from read().\n");
- destroyLocalReference(env, jthr);
+ fputs("hadoopReadZero: failed to get class name of "
+ "exception from read().\n", stderr);
+ destroyLocalReference(env, exc);
destroyLocalReference(env, jthr);
ret = EIO;
goto done;
- }
- if (!strcmp(className, "java.io.EOFException")) {
- ret = 0; // EOF
- goto done;
}
if (!strcmp(className, "java.lang.UnsupportedOperationException")) {
ret = EPROTONOSUPPORT;
@@ -2277,72 +2380,116 @@ done:
return ret;
}
-int32_t hadoopZeroCopyRead(struct hadoopZeroCopyCursor *zcursor,
- int32_t toRead, const void **data)
+struct hadoopRzBuffer* hadoopReadZero(hdfsFile file,
+ struct hadoopRzOptions *opts, int32_t maxLength)
{
- int32_t ret, nRead = -1;
- JNIEnv* env;
- jthrowable jthr;
- jobject byteBuffer = NULL;
- uint8_t *addr;
- jint position;
+ JNIEnv *env;
+ jthrowable jthr = NULL;
jvalue jVal;
-
+ jobject enumSet = NULL, byteBuffer = NULL;
+ struct hadoopRzBuffer* buffer = NULL;
+ int ret;
+
env = getJNIEnv();
- if (env == NULL) {
+ if (!env) {
errno = EINTERNAL;
- return -1;
- }
- jthr = invokeMethod(env, NULL, INSTANCE, (jobject)zcursor,
- HADOOP_ZERO_COPY_CURSOR, "read", "(I)V", toRead);
- if (jthr) {
- ret = translateZCRException(env, jthr);
- goto done;
+ return NULL;
}
- jthr = invokeMethod(env, &jVal, INSTANCE, (jobject)zcursor,
- HADOOP_ZERO_COPY_CURSOR, "getData",
- "()Ljava/nio/ByteBuffer;");
- if (jthr) {
- ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
- "hadoopZeroCopyRead(toRead=%"PRId32"): getData failed",
- toRead);
+ if (file->type != INPUT) {
+ fputs("Cannot read from a non-InputStream object!\n", stderr);
+ ret = EINVAL;
goto done;
}
- byteBuffer = jVal.l;
- addr = (*env)->GetDirectBufferAddress(env, byteBuffer);
- if (!addr) {
- fprintf(stderr, "hadoopZeroCopyRead(toRead=%"PRId32"): "
- "failed to get direct buffer address.\n", toRead);
- ret = EIO;
+ buffer = calloc(1, sizeof(struct hadoopRzBuffer));
+ if (!buffer) {
+ ret = ENOMEM;
goto done;
}
- jthr = invokeMethod(env, &jVal, INSTANCE, byteBuffer,
- "java/nio/ByteBuffer", "position", "()I");
+ jthr = hadoopRzOptionsGetEnumSet(env, opts, &enumSet);
if (jthr) {
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
- "hadoopZeroCopyRead(toRead=%"PRId32"): ByteBuffer#position "
- "failed", toRead);
+ "hadoopReadZero: hadoopRzOptionsGetEnumSet failed: ");
goto done;
}
- position = jVal.i;
- jthr = invokeMethod(env, &jVal, INSTANCE, byteBuffer,
- "java/nio/ByteBuffer", "remaining", "()I");
+ jthr = invokeMethod(env, &jVal, INSTANCE, file->file, HADOOP_ISTRM, "read",
+ "(Lorg/apache/hadoop/io/ByteBufferPool;ILjava/util/EnumSet;)"
+ "Ljava/nio/ByteBuffer;", opts->byteBufferPool, maxLength, enumSet);
if (jthr) {
- ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
- "hadoopZeroCopyRead(toRead=%"PRId32"): ByteBuffer#remaining "
- "failed", toRead);
+ ret = translateZCRException(env, jthr);
goto done;
}
+ byteBuffer = jVal.l;
+ if (!byteBuffer) {
+ buffer->byteBuffer = NULL;
+ buffer->length = 0;
+ buffer->ptr = NULL;
+ } else {
+ buffer->byteBuffer = (*env)->NewGlobalRef(env, byteBuffer);
+ if (!buffer->byteBuffer) {
+ ret = printPendingExceptionAndFree(env, PRINT_EXC_ALL,
+ "hadoopReadZero: failed to create global ref to ByteBuffer");
+ goto done;
+ }
+ ret = hadoopReadZeroExtractBuffer(env, opts, buffer);
+ if (ret) {
+ goto done;
+ }
+ }
ret = 0;
- nRead = jVal.i;
- *data = addr + position;
done:
(*env)->DeleteLocalRef(env, byteBuffer);
- if (nRead == -1) {
+ if (ret) {
+ if (buffer) {
+ if (buffer->byteBuffer) {
+ (*env)->DeleteGlobalRef(env, buffer->byteBuffer);
+ }
+ free(buffer);
+ }
errno = ret;
- return -1;
+ return NULL;
+ } else {
+ errno = 0;
+ }
+ return buffer;
+}
+
+int32_t hadoopRzBufferLength(const struct hadoopRzBuffer *buffer)
+{
+ return buffer->length;
+}
+
+const void *hadoopRzBufferGet(const struct hadoopRzBuffer *buffer)
+{
+ return buffer->ptr;
+}
+
+void hadoopRzBufferFree(hdfsFile file, struct hadoopRzBuffer *buffer)
+{
+ jvalue jVal;
+ jthrowable jthr;
+ JNIEnv* env;
+
+ env = getJNIEnv();
+ if (env == NULL) {
+ errno = EINTERNAL;
+ return;
+ }
+ if (buffer->byteBuffer) {
+ jthr = invokeMethod(env, &jVal, INSTANCE, file->file,
+ HADOOP_ISTRM, "releaseBuffer",
+ "(Ljava/nio/ByteBuffer;)V", buffer->byteBuffer);
+ if (jthr) {
+ printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+ "hadoopRzBufferFree: releaseBuffer failed: ");
+ // even on error, we have to delete the reference.
+ }
+ (*env)->DeleteGlobalRef(env, buffer->byteBuffer);
+ }
+ if (!buffer->direct) {
+ free(buffer->ptr);
}
- return nRead;
+ memset(buffer, 0, sizeof(*buffer));
+ free(buffer);
}
char***
Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.h?rev=1526020&r1=1526019&r2=1526020&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.h (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.h Tue Sep 24 21:40:53 2013
@@ -36,6 +36,8 @@
#define EINTERNAL 255
#endif
+#define ELASTIC_BYTE_BUFFER_POOL_CLASS \
+ "org/apache/hadoop/io/ElasticByteBufferPool"
/** All APIs set errno to meaningful values */
@@ -65,6 +67,10 @@ extern "C" {
struct hdfsFile_internal;
typedef struct hdfsFile_internal* hdfsFile;
+ struct hadoopRzOptions;
+
+ struct hadoopRzBuffer;
+
/**
* Determine if a file is open for read.
*
@@ -683,86 +689,104 @@ extern "C" {
int hdfsUtime(hdfsFS fs, const char* path, tTime mtime, tTime atime);
/**
- * Create a zero-copy cursor object.
+ * Allocate a zero-copy options structure.
*
- * @param file The file to use for zero-copy reads.
+ * You must free all options structures allocated with this function using
+ * hadoopRzOptionsFree.
*
- * @return The zero-copy cursor, or NULL + errno on failure.
+ * @return A zero-copy options structure, or NULL if one could
+ * not be allocated. If NULL is returned, errno will
+ * contain the error number.
*/
- struct hadoopZeroCopyCursor* hadoopZeroCopyCursorAlloc(hdfsFile file);
+ struct hadoopRzOptions *hadoopRzOptionsAlloc(void);
/**
- * Set the fallback buffer which will be used by the zero copy object.
+ * Determine whether we should skip checksums in read0.
*
- * You are responsible for ensuring that this buffer stays valid until you
- * either set a different buffer by calling this function again, or free the
- * zero-copy cursor.
+ * @param opts The options structure.
+ * @param skip Nonzero to skip checksums sometimes; zero to always
+ * check them.
*
- * @param zcursor The zero-copy cursor.
- * @param cbuf The buffer to use.
- * @param size Size of the buffer.
- *
- * @return 0 on success. -1 on error. Errno will be set on
- * error.
+ * @return 0 on success; -1 plus errno on failure.
*/
- int hadoopZeroCopyCursorSetFallbackBuffer(
- struct hadoopZeroCopyCursor* zcursor, void *cbuf, uint32_t size);
+ int hadoopRzOptionsSetSkipChecksum(
+ struct hadoopRzOptions *opts, int skip);
/**
- * Set whether our cursor should skip checksums or not.
+ * Set the ByteBufferPool to use with read0.
*
- * @param zcursor The cursor
- * @param skipChecksums Nonzero to skip checksums.
+ * @param opts The options structure.
+ * @param className If this is NULL, we will not use any
+ * ByteBufferPool. If this is non-NULL, it will be
+ * treated as the name of the pool class to use.
+ * For example, you can use
+ * ELASTIC_BYTE_BUFFER_POOL_CLASS.
*
- * @return -1 on error, 0 otherwise.
+ * @return 0 if the ByteBufferPool class was found and
+ * instantiated;
+ * -1 plus errno otherwise.
*/
- int hadoopZeroCopyCursorSetSkipChecksums(
- struct hadoopZeroCopyCursor* zcursor, int skipChecksums);
+ int hadoopRzOptionsSetByteBufferPool(
+ struct hadoopRzOptions *opts, const char *className);
/**
- * Set whether our cursor should allow short reads to occur.
- * Short reads will always occur if there is not enough data to read
- * (i.e., at EOF), but normally we don't return them when reading other
- * parts of the file.
- *
- * @param zcursor The cursor
- * @param skipChecksums Nonzero to skip checksums.
+ * Free a hadoopRzOptionsFree structure.
*
- * @return -1 on error, 0 otherwise.
+ * @param opts The options structure to free.
+ * Any associated ByteBufferPool will also be freed.
*/
- int hadoopZeroCopyCursorSetAllowShortReads(
- struct hadoopZeroCopyCursor* zcursor, int allowShort);
+ void hadoopRzOptionsFree(struct hadoopRzOptions *opts);
/**
- * Free zero-copy cursor.
+ * Perform a byte buffer read.
+ * If possible, this will be a zero-copy (mmap) read.
*
- * This will dispose of the cursor allocated by hadoopZeroCopyCursorAlloc, as
- * well as any memory map that we have created. You must be done with the
- * data returned from hadoopZeroCopyRead before calling this.
+ * @param file The file to read from.
+ * @param opts An options structure created by hadoopRzOptionsAlloc.
+ * @param maxLength The maximum length to read. We may read fewer bytes
+ * than this length.
+ *
+ * @return On success, returns a new hadoopRzBuffer.
+ * This buffer will continue to be valid and readable
+ * until it is released by readZeroBufferFree. Failure to
+ * release a buffer will lead to a memory leak.
+ *
+ * NULL plus an errno code on an error.
+ * errno = EOPNOTSUPP indicates that we could not do a
+ * zero-copy read, and there was no ByteBufferPool
+ * supplied.
+ */
+ struct hadoopRzBuffer* hadoopReadZero(hdfsFile file,
+ struct hadoopRzOptions *opts, int32_t maxLength);
+
+ /**
+ * Determine the length of the buffer returned from readZero.
*
- * @param zcursor The zero-copy cursor.
+ * @param buffer a buffer returned from readZero.
+ * @return the length of the buffer.
*/
- void hadoopZeroCopyCursorFree(struct hadoopZeroCopyCursor *zcursor);
+ int32_t hadoopRzBufferLength(const struct hadoopRzBuffer *buffer);
- /*
- * Perform a zero-copy read.
+ /**
+ * Get a pointer to the raw buffer returned from readZero.
+ *
+ * To find out how many bytes this buffer contains, call
+ * hadoopRzBufferLength.
*
- * @param zcursor The zero-copy cursor object.
- * @param toRead The maximum amount to read.
- * @param data (out param) on succesful return, a pointer to the
- * data. This pointer will remain valid until the next
- * call to hadoopZeroCopyRead, or until
- * hadoopZeroCopyCursorFree is called on zcursor.
+ * @param buffer a buffer returned from readZero.
+ * @return a pointer to the start of the buffer. This will be
+ * NULL when end-of-file has been reached.
+ */
+ const void *hadoopRzBufferGet(const struct hadoopRzBuffer *buffer);
+
+ /**
+ * Release a buffer obtained through readZero.
*
- * @return -2 if zero-copy is unavailable, and
- * -1 if there was an error. errno will be the error.
- * 0 if we hit end-of-file without reading anything.
- * The positive number of bytes read otherwise. Short
- * reads will happen only if EOF is reached.
- * The amount read otherwise.
+ * @param file The hdfs stream that created this buffer. This must be
+ * the same stream you called hadoopReadZero on.
+ * @param buffer The buffer to release.
*/
- int32_t hadoopZeroCopyRead(struct hadoopZeroCopyCursor *zcursor,
- int32_t toRead, const void **data);
+ void hadoopRzBufferFree(hdfsFile file, struct hadoopRzBuffer *buffer);
#ifdef __cplusplus
}
Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/jni_helper.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/jni_helper.c?rev=1526020&r1=1526019&r2=1526020&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/jni_helper.c (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/jni_helper.c Tue Sep 24 21:40:53 2013
@@ -647,3 +647,34 @@ done:
(*env)->DeleteLocalRef(env, jvalue);
return jthr;
}
+
+jthrowable fetchEnumInstance(JNIEnv *env, const char *className,
+ const char *valueName, jobject *out)
+{
+ jclass clazz;
+ jfieldID fieldId;
+ jobject jEnum;
+ char prettyClass[256];
+
+ clazz = (*env)->FindClass(env, className);
+ if (!clazz) {
+ return newRuntimeError(env, "fetchEnum(%s, %s): failed to find class.",
+ className, valueName);
+ }
+ if (snprintf(prettyClass, sizeof(prettyClass), "L%s;", className)
+ >= sizeof(prettyClass)) {
+ return newRuntimeError(env, "fetchEnum(%s, %s): class name too long.",
+ className, valueName);
+ }
+ fieldId = (*env)->GetStaticFieldID(env, clazz, valueName, prettyClass);
+ if (!fieldId) {
+ return getPendingExceptionAndClear(env);
+ }
+ jEnum = (*env)->GetStaticObjectField(env, clazz, fieldId);
+ if (!jEnum) {
+ return getPendingExceptionAndClear(env);
+ }
+ *out = jEnum;
+ return NULL;
+}
+
Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/jni_helper.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/jni_helper.h?rev=1526020&r1=1526019&r2=1526020&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/jni_helper.h (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/jni_helper.h Tue Sep 24 21:40:53 2013
@@ -140,6 +140,21 @@ int javaObjectIsOfClass(JNIEnv *env, job
jthrowable hadoopConfSetStr(JNIEnv *env, jobject jConfiguration,
const char *key, const char *value);
+/**
+ * Fetch an instance of an Enum.
+ *
+ * @param env The JNI environment.
+ * @param className The enum class name.
+ * @param valueName The name of the enum value
+ * @param out (out param) on success, a local reference to an
+ * instance of the enum object. (Since Java enums are
+ * singletones, this is also the only instance.)
+ *
+ * @return NULL on success; exception otherwise
+ */
+jthrowable fetchEnumInstance(JNIEnv *env, const char *className,
+ const char *valueName, jobject *out);
+
#endif /*LIBHDFS_JNI_HELPER_H*/
/**
Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/test/test_libhdfs_zerocopy.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/test/test_libhdfs_zerocopy.c?rev=1526020&r1=1526019&r2=1526020&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/test/test_libhdfs_zerocopy.c (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/test/test_libhdfs_zerocopy.c Tue Sep 24 21:40:53 2013
@@ -81,40 +81,49 @@ static void printBuf(const uint8_t *buf,
static int doTestZeroCopyReads(hdfsFS fs, const char *fileName)
{
hdfsFile file = NULL;
- struct hadoopZeroCopyCursor *zcursor = NULL;
- uint8_t *backingBuffer = NULL, *block;
- const void *zcPtr;
+ struct hadoopRzOptions *opts = NULL;
+ struct hadoopRzBuffer *buffer = NULL;
+ uint8_t *block;
file = hdfsOpenFile(fs, fileName, O_RDONLY, 0, 0, 0);
EXPECT_NONNULL(file);
- zcursor = hadoopZeroCopyCursorAlloc(file);
- EXPECT_NONNULL(zcursor);
+ opts = hadoopRzOptionsAlloc();
+ EXPECT_NONNULL(opts);
+ EXPECT_ZERO(hadoopRzOptionsSetSkipChecksum(opts, 1));
/* haven't read anything yet */
EXPECT_ZERO(expectFileStats(file, 0LL, 0LL, 0LL, 0LL));
block = getZeroCopyBlockData(0);
EXPECT_NONNULL(block);
/* first read is half of a block. */
+ buffer = hadoopReadZero(file, opts, TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2);
+ EXPECT_NONNULL(buffer);
EXPECT_INT_EQ(TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2,
- hadoopZeroCopyRead(zcursor,
- TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2, &zcPtr));
- EXPECT_ZERO(memcmp(zcPtr, block, TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2));
+ hadoopRzBufferLength(buffer));
+ EXPECT_ZERO(memcmp(hadoopRzBufferGet(buffer), block,
+ TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2));
+ hadoopRzBufferFree(file, buffer);
/* read the next half of the block */
+ buffer = hadoopReadZero(file, opts, TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2);
+ EXPECT_NONNULL(buffer);
EXPECT_INT_EQ(TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2,
- hadoopZeroCopyRead(zcursor,
- TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2, &zcPtr));
- EXPECT_ZERO(memcmp(zcPtr, block + (TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2),
- TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2));
+ hadoopRzBufferLength(buffer));
+ EXPECT_ZERO(memcmp(hadoopRzBufferGet(buffer),
+ block + (TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2),
+ TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2));
+ hadoopRzBufferFree(file, buffer);
free(block);
EXPECT_ZERO(expectFileStats(file, TEST_ZEROCOPY_FULL_BLOCK_SIZE,
TEST_ZEROCOPY_FULL_BLOCK_SIZE,
TEST_ZEROCOPY_FULL_BLOCK_SIZE,
TEST_ZEROCOPY_FULL_BLOCK_SIZE));
/* Now let's read just a few bytes. */
- EXPECT_INT_EQ(SMALL_READ_LEN,
- hadoopZeroCopyRead(zcursor, SMALL_READ_LEN, &zcPtr));
+ buffer = hadoopReadZero(file, opts, SMALL_READ_LEN);
+ EXPECT_NONNULL(buffer);
+ EXPECT_INT_EQ(SMALL_READ_LEN, hadoopRzBufferLength(buffer));
block = getZeroCopyBlockData(1);
EXPECT_NONNULL(block);
- EXPECT_ZERO(memcmp(block, zcPtr, SMALL_READ_LEN));
+ EXPECT_ZERO(memcmp(block, hadoopRzBufferGet(buffer), SMALL_READ_LEN));
+ hadoopRzBufferFree(file, buffer);
EXPECT_INT_EQ(TEST_ZEROCOPY_FULL_BLOCK_SIZE + SMALL_READ_LEN,
hdfsTell(fs, file));
EXPECT_ZERO(expectFileStats(file,
@@ -123,37 +132,36 @@ static int doTestZeroCopyReads(hdfsFS fs
TEST_ZEROCOPY_FULL_BLOCK_SIZE + SMALL_READ_LEN,
TEST_ZEROCOPY_FULL_BLOCK_SIZE + SMALL_READ_LEN));
- /* Try to read a full block's worth of data. This will cross the block
- * boundary, which means we have to fall back to non-zero-copy reads.
- * However, because we don't have a backing buffer, the fallback will fail
- * with EPROTONOSUPPORT. */
- EXPECT_INT_EQ(-1,
- hadoopZeroCopyRead(zcursor, TEST_ZEROCOPY_FULL_BLOCK_SIZE, &zcPtr));
+ /* Clear 'skip checksums' and test that we can't do zero-copy reads any
+ * more. Since there is no ByteBufferPool set, we should fail with
+ * EPROTONOSUPPORT.
+ */
+ EXPECT_ZERO(hadoopRzOptionsSetSkipChecksum(opts, 0));
+ EXPECT_NULL(hadoopReadZero(file, opts, TEST_ZEROCOPY_FULL_BLOCK_SIZE));
EXPECT_INT_EQ(EPROTONOSUPPORT, errno);
- /* Now set a backing buffer and try again. It should succeed this time. */
- backingBuffer = malloc(ZC_BUF_LEN);
- EXPECT_NONNULL(backingBuffer);
- EXPECT_ZERO(hadoopZeroCopyCursorSetFallbackBuffer(zcursor,
- backingBuffer, ZC_BUF_LEN));
- EXPECT_INT_EQ(TEST_ZEROCOPY_FULL_BLOCK_SIZE,
- hadoopZeroCopyRead(zcursor, TEST_ZEROCOPY_FULL_BLOCK_SIZE, &zcPtr));
+ /* Now set a ByteBufferPool and try again. It should succeed this time. */
+ EXPECT_ZERO(hadoopRzOptionsSetByteBufferPool(opts,
+ ELASTIC_BYTE_BUFFER_POOL_CLASS));
+ buffer = hadoopReadZero(file, opts, TEST_ZEROCOPY_FULL_BLOCK_SIZE);
+ EXPECT_NONNULL(buffer);
+ EXPECT_INT_EQ(TEST_ZEROCOPY_FULL_BLOCK_SIZE, hadoopRzBufferLength(buffer));
EXPECT_ZERO(expectFileStats(file,
(2 * TEST_ZEROCOPY_FULL_BLOCK_SIZE) + SMALL_READ_LEN,
(2 * TEST_ZEROCOPY_FULL_BLOCK_SIZE) + SMALL_READ_LEN,
(2 * TEST_ZEROCOPY_FULL_BLOCK_SIZE) + SMALL_READ_LEN,
TEST_ZEROCOPY_FULL_BLOCK_SIZE + SMALL_READ_LEN));
- EXPECT_ZERO(memcmp(block + SMALL_READ_LEN, zcPtr,
+ EXPECT_ZERO(memcmp(block + SMALL_READ_LEN, hadoopRzBufferGet(buffer),
TEST_ZEROCOPY_FULL_BLOCK_SIZE - SMALL_READ_LEN));
free(block);
block = getZeroCopyBlockData(2);
EXPECT_NONNULL(block);
- EXPECT_ZERO(memcmp(block, zcPtr +
+ EXPECT_ZERO(memcmp(block, hadoopRzBufferGet(buffer) +
(TEST_ZEROCOPY_FULL_BLOCK_SIZE - SMALL_READ_LEN), SMALL_READ_LEN));
+ hadoopRzBufferFree(file, buffer);
free(block);
- hadoopZeroCopyCursorFree(zcursor);
+ hadoopRzOptionsFree(opts);
EXPECT_ZERO(hdfsCloseFile(fs, file));
- free(backingBuffer);
return 0;
}
Added: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java?rev=1526020&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java (added)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java Tue Sep 24 21:40:53 2013
@@ -0,0 +1,530 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.concurrent.TimeoutException;
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.Random;
+
+import org.apache.commons.lang.SystemUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.client.ClientMmap;
+import org.apache.hadoop.hdfs.client.ClientMmapManager;
+import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.io.ByteBufferPool;
+import org.apache.hadoop.io.ElasticByteBufferPool;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.nativeio.NativeIO;
+import org.apache.hadoop.net.unix.DomainSocket;
+import org.apache.hadoop.net.unix.TemporarySocketDirectory;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+
+/**
+ * This class tests if EnhancedByteBufferAccess works correctly.
+ */
+public class TestEnhancedByteBufferAccess {
+ private static final Log LOG =
+ LogFactory.getLog(TestEnhancedByteBufferAccess.class.getName());
+
+ static TemporarySocketDirectory sockDir;
+
+ @BeforeClass
+ public static void init() {
+ sockDir = new TemporarySocketDirectory();
+ DomainSocket.disableBindPathValidation();
+ }
+
+ private static byte[] byteBufferToArray(ByteBuffer buf) {
+ byte resultArray[] = new byte[buf.remaining()];
+ buf.get(resultArray);
+ buf.flip();
+ return resultArray;
+ }
+
+ public static HdfsConfiguration initZeroCopyTest() {
+ Assume.assumeTrue(NativeIO.isAvailable());
+ Assume.assumeTrue(SystemUtils.IS_OS_UNIX);
+ HdfsConfiguration conf = new HdfsConfiguration();
+ conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
+ conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 4096);
+ conf.setInt(DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_SIZE, 3);
+ conf.setLong(DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS, 100);
+ conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
+ new File(sockDir.getDir(),
+ "TestRequestMmapAccess._PORT.sock").getAbsolutePath());
+ conf.setBoolean(DFSConfigKeys.
+ DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, true);
+ return conf;
+ }
+
+ @Test
+ public void testZeroCopyReads() throws Exception {
+ HdfsConfiguration conf = initZeroCopyTest();
+ MiniDFSCluster cluster = null;
+ final Path TEST_PATH = new Path("/a");
+ FSDataInputStream fsIn = null;
+ final int TEST_FILE_LENGTH = 12345;
+
+ FileSystem fs = null;
+ try {
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+ cluster.waitActive();
+ fs = cluster.getFileSystem();
+ DFSTestUtil.createFile(fs, TEST_PATH,
+ TEST_FILE_LENGTH, (short)1, 7567L);
+ try {
+ DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
+ } catch (InterruptedException e) {
+ Assert.fail("unexpected InterruptedException during " +
+ "waitReplication: " + e);
+ } catch (TimeoutException e) {
+ Assert.fail("unexpected TimeoutException during " +
+ "waitReplication: " + e);
+ }
+ fsIn = fs.open(TEST_PATH);
+ byte original[] = new byte[TEST_FILE_LENGTH];
+ IOUtils.readFully(fsIn, original, 0, TEST_FILE_LENGTH);
+ fsIn.close();
+ fsIn = fs.open(TEST_PATH);
+ ByteBuffer result = fsIn.read(null, 4096,
+ EnumSet.of(ReadOption.SKIP_CHECKSUMS));
+ Assert.assertEquals(4096, result.remaining());
+ HdfsDataInputStream dfsIn = (HdfsDataInputStream)fsIn;
+ Assert.assertEquals(4096,
+ dfsIn.getReadStatistics().getTotalBytesRead());
+ Assert.assertEquals(4096,
+ dfsIn.getReadStatistics().getTotalZeroCopyBytesRead());
+ Assert.assertArrayEquals(Arrays.copyOfRange(original, 0, 4096),
+ byteBufferToArray(result));
+ fsIn.releaseBuffer(result);
+ } finally {
+ if (fsIn != null) fsIn.close();
+ if (fs != null) fs.close();
+ if (cluster != null) cluster.shutdown();
+ }
+ }
+
+ @Test
+ public void testShortZeroCopyReads() throws Exception {
+ HdfsConfiguration conf = initZeroCopyTest();
+ MiniDFSCluster cluster = null;
+ final Path TEST_PATH = new Path("/a");
+ FSDataInputStream fsIn = null;
+ final int TEST_FILE_LENGTH = 12345;
+
+ FileSystem fs = null;
+ try {
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+ cluster.waitActive();
+ fs = cluster.getFileSystem();
+ DFSTestUtil.createFile(fs, TEST_PATH, TEST_FILE_LENGTH, (short)1, 7567L);
+ try {
+ DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
+ } catch (InterruptedException e) {
+ Assert.fail("unexpected InterruptedException during " +
+ "waitReplication: " + e);
+ } catch (TimeoutException e) {
+ Assert.fail("unexpected TimeoutException during " +
+ "waitReplication: " + e);
+ }
+ fsIn = fs.open(TEST_PATH);
+ byte original[] = new byte[TEST_FILE_LENGTH];
+ IOUtils.readFully(fsIn, original, 0, TEST_FILE_LENGTH);
+ fsIn.close();
+ fsIn = fs.open(TEST_PATH);
+
+ // Try to read 8192, but only get 4096 because of the block size.
+ HdfsDataInputStream dfsIn = (HdfsDataInputStream)fsIn;
+ ByteBuffer result =
+ dfsIn.read(null, 8192, EnumSet.of(ReadOption.SKIP_CHECKSUMS));
+ Assert.assertEquals(4096, result.remaining());
+ Assert.assertEquals(4096,
+ dfsIn.getReadStatistics().getTotalBytesRead());
+ Assert.assertEquals(4096,
+ dfsIn.getReadStatistics().getTotalZeroCopyBytesRead());
+ Assert.assertArrayEquals(Arrays.copyOfRange(original, 0, 4096),
+ byteBufferToArray(result));
+ dfsIn.releaseBuffer(result);
+
+ // Try to read 4097, but only get 4096 because of the block size.
+ result =
+ dfsIn.read(null, 4097, EnumSet.of(ReadOption.SKIP_CHECKSUMS));
+ Assert.assertEquals(4096, result.remaining());
+ Assert.assertArrayEquals(Arrays.copyOfRange(original, 4096, 8192),
+ byteBufferToArray(result));
+ dfsIn.releaseBuffer(result);
+ } finally {
+ if (fsIn != null) fsIn.close();
+ if (fs != null) fs.close();
+ if (cluster != null) cluster.shutdown();
+ }
+ }
+
+ @Test
+ public void testZeroCopyReadsNoFallback() throws Exception {
+ HdfsConfiguration conf = initZeroCopyTest();
+ MiniDFSCluster cluster = null;
+ final Path TEST_PATH = new Path("/a");
+ FSDataInputStream fsIn = null;
+ final int TEST_FILE_LENGTH = 12345;
+
+ FileSystem fs = null;
+ try {
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+ cluster.waitActive();
+ fs = cluster.getFileSystem();
+ DFSTestUtil.createFile(fs, TEST_PATH,
+ TEST_FILE_LENGTH, (short)1, 7567L);
+ try {
+ DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
+ } catch (InterruptedException e) {
+ Assert.fail("unexpected InterruptedException during " +
+ "waitReplication: " + e);
+ } catch (TimeoutException e) {
+ Assert.fail("unexpected TimeoutException during " +
+ "waitReplication: " + e);
+ }
+ fsIn = fs.open(TEST_PATH);
+ byte original[] = new byte[TEST_FILE_LENGTH];
+ IOUtils.readFully(fsIn, original, 0, TEST_FILE_LENGTH);
+ fsIn.close();
+ fsIn = fs.open(TEST_PATH);
+ HdfsDataInputStream dfsIn = (HdfsDataInputStream)fsIn;
+ ByteBuffer result;
+ try {
+ result = dfsIn.read(null, 4097, EnumSet.noneOf(ReadOption.class));
+ Assert.fail("expected UnsupportedOperationException");
+ } catch (UnsupportedOperationException e) {
+ // expected
+ }
+ result = dfsIn.read(null, 4096, EnumSet.of(ReadOption.SKIP_CHECKSUMS));
+ Assert.assertEquals(4096, result.remaining());
+ Assert.assertEquals(4096,
+ dfsIn.getReadStatistics().getTotalBytesRead());
+ Assert.assertEquals(4096,
+ dfsIn.getReadStatistics().getTotalZeroCopyBytesRead());
+ Assert.assertArrayEquals(Arrays.copyOfRange(original, 0, 4096),
+ byteBufferToArray(result));
+ } finally {
+ if (fsIn != null) fsIn.close();
+ if (fs != null) fs.close();
+ if (cluster != null) cluster.shutdown();
+ }
+ }
+
+ private static class CountingVisitor
+ implements ClientMmapManager.ClientMmapVisitor {
+ int count = 0;
+
+ @Override
+ public void accept(ClientMmap mmap) {
+ count++;
+ }
+
+ public void reset() {
+ count = 0;
+ }
+ }
+
+ @Test
+ public void testZeroCopyMmapCache() throws Exception {
+ HdfsConfiguration conf = initZeroCopyTest();
+ MiniDFSCluster cluster = null;
+ final Path TEST_PATH = new Path("/a");
+ final int TEST_FILE_LENGTH = 16385;
+ final int RANDOM_SEED = 23453;
+ FSDataInputStream fsIn = null;
+ ByteBuffer results[] = { null, null, null, null, null };
+
+ DistributedFileSystem fs = null;
+ try {
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+ cluster.waitActive();
+ fs = cluster.getFileSystem();
+ DFSTestUtil.createFile(fs, TEST_PATH,
+ TEST_FILE_LENGTH, (short)1, RANDOM_SEED);
+ try {
+ DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
+ } catch (InterruptedException e) {
+ Assert.fail("unexpected InterruptedException during " +
+ "waitReplication: " + e);
+ } catch (TimeoutException e) {
+ Assert.fail("unexpected TimeoutException during " +
+ "waitReplication: " + e);
+ }
+ fsIn = fs.open(TEST_PATH);
+ byte original[] = new byte[TEST_FILE_LENGTH];
+ IOUtils.readFully(fsIn, original, 0, TEST_FILE_LENGTH);
+ fsIn.close();
+ fsIn = fs.open(TEST_PATH);
+ final ClientMmapManager mmapManager = fs.getClient().getMmapManager();
+ final CountingVisitor countingVisitor = new CountingVisitor();
+ mmapManager.visitMmaps(countingVisitor);
+ Assert.assertEquals(0, countingVisitor.count);
+ mmapManager.visitEvictable(countingVisitor);
+ Assert.assertEquals(0, countingVisitor.count);
+ results[0] = fsIn.read(null, 4096,
+ EnumSet.of(ReadOption.SKIP_CHECKSUMS));
+ fsIn.seek(0);
+ results[1] = fsIn.read(null, 4096,
+ EnumSet.of(ReadOption.SKIP_CHECKSUMS));
+ mmapManager.visitMmaps(countingVisitor);
+ Assert.assertEquals(1, countingVisitor.count);
+ countingVisitor.reset();
+ mmapManager.visitEvictable(countingVisitor);
+ Assert.assertEquals(0, countingVisitor.count);
+ countingVisitor.reset();
+
+ // The mmaps should be of the first block of the file.
+ final ExtendedBlock firstBlock = DFSTestUtil.getFirstBlock(fs, TEST_PATH);
+ mmapManager.visitMmaps(new ClientMmapManager.ClientMmapVisitor() {
+ @Override
+ public void accept(ClientMmap mmap) {
+ Assert.assertEquals(firstBlock, mmap.getBlock());
+ }
+ });
+
+ // Read more blocks.
+ results[2] = fsIn.read(null, 4096,
+ EnumSet.of(ReadOption.SKIP_CHECKSUMS));
+ results[3] = fsIn.read(null, 4096,
+ EnumSet.of(ReadOption.SKIP_CHECKSUMS));
+ try {
+ results[4] = fsIn.read(null, 4096,
+ EnumSet.of(ReadOption.SKIP_CHECKSUMS));
+ Assert.fail("expected UnsupportedOperationException");
+ } catch (UnsupportedOperationException e) {
+ // expected
+ }
+
+ // we should have 3 mmaps, 0 evictable
+ mmapManager.visitMmaps(countingVisitor);
+ Assert.assertEquals(3, countingVisitor.count);
+ countingVisitor.reset();
+ mmapManager.visitEvictable(countingVisitor);
+ Assert.assertEquals(0, countingVisitor.count);
+
+ // After we close the cursors, the mmaps should be evictable for
+ // a brief period of time. Then, they should be closed (we're
+ // using a very quick timeout)
+ for (ByteBuffer buffer : results) {
+ if (buffer != null) {
+ fsIn.releaseBuffer(buffer);
+ }
+ }
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
+ public Boolean get() {
+ countingVisitor.reset();
+ try {
+ mmapManager.visitEvictable(countingVisitor);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ return false;
+ }
+ return (0 == countingVisitor.count);
+ }
+ }, 10, 10000);
+ countingVisitor.reset();
+ mmapManager.visitMmaps(countingVisitor);
+ Assert.assertEquals(0, countingVisitor.count);
+ } finally {
+ if (fsIn != null) fsIn.close();
+ if (fs != null) fs.close();
+ if (cluster != null) cluster.shutdown();
+ }
+ }
+
+ /**
+ * Test HDFS fallback reads. HDFS streams support the ByteBufferReadable
+ * interface.
+ */
+ @Test
+ public void testHdfsFallbackReads() throws Exception {
+ HdfsConfiguration conf = initZeroCopyTest();
+ MiniDFSCluster cluster = null;
+ final Path TEST_PATH = new Path("/a");
+ final int TEST_FILE_LENGTH = 16385;
+ final int RANDOM_SEED = 23453;
+ FSDataInputStream fsIn = null;
+
+ DistributedFileSystem fs = null;
+ try {
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+ cluster.waitActive();
+ fs = cluster.getFileSystem();
+ DFSTestUtil.createFile(fs, TEST_PATH,
+ TEST_FILE_LENGTH, (short)1, RANDOM_SEED);
+ try {
+ DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
+ } catch (InterruptedException e) {
+ Assert.fail("unexpected InterruptedException during " +
+ "waitReplication: " + e);
+ } catch (TimeoutException e) {
+ Assert.fail("unexpected TimeoutException during " +
+ "waitReplication: " + e);
+ }
+ fsIn = fs.open(TEST_PATH);
+ byte original[] = new byte[TEST_FILE_LENGTH];
+ IOUtils.readFully(fsIn, original, 0, TEST_FILE_LENGTH);
+ fsIn.close();
+ fsIn = fs.open(TEST_PATH);
+ testFallbackImpl(fsIn, original);
+ } finally {
+ if (fsIn != null) fsIn.close();
+ if (fs != null) fs.close();
+ if (cluster != null) cluster.shutdown();
+ }
+ }
+
+ private static class RestrictedAllocatingByteBufferPool
+ implements ByteBufferPool {
+ private final boolean direct;
+
+ RestrictedAllocatingByteBufferPool(boolean direct) {
+ this.direct = direct;
+ }
+ @Override
+ public ByteBuffer getBuffer(boolean direct, int length) {
+ Preconditions.checkArgument(this.direct == direct);
+ return direct ? ByteBuffer.allocateDirect(length) :
+ ByteBuffer.allocate(length);
+ }
+ @Override
+ public void putBuffer(ByteBuffer buffer) {
+ }
+ }
+
+ private static void testFallbackImpl(InputStream stream,
+ byte original[]) throws Exception {
+ RestrictedAllocatingByteBufferPool bufferPool =
+ new RestrictedAllocatingByteBufferPool(
+ stream instanceof ByteBufferReadable);
+
+ ByteBuffer result = ByteBufferUtil.fallbackRead(stream, bufferPool, 10);
+ Assert.assertEquals(10, result.remaining());
+ Assert.assertArrayEquals(Arrays.copyOfRange(original, 0, 10),
+ byteBufferToArray(result));
+
+ result = ByteBufferUtil.fallbackRead(stream, bufferPool, 5000);
+ Assert.assertEquals(5000, result.remaining());
+ Assert.assertArrayEquals(Arrays.copyOfRange(original, 10, 5010),
+ byteBufferToArray(result));
+
+ result = ByteBufferUtil.fallbackRead(stream, bufferPool, 9999999);
+ Assert.assertEquals(11375, result.remaining());
+ Assert.assertArrayEquals(Arrays.copyOfRange(original, 5010, 16385),
+ byteBufferToArray(result));
+
+ result = ByteBufferUtil.fallbackRead(stream, bufferPool, 10);
+ Assert.assertNull(result);
+ }
+
+ /**
+ * Test the {@link ByteBufferUtil#fallbackRead} function directly.
+ */
+ @Test
+ public void testFallbackRead() throws Exception {
+ HdfsConfiguration conf = initZeroCopyTest();
+ MiniDFSCluster cluster = null;
+ final Path TEST_PATH = new Path("/a");
+ final int TEST_FILE_LENGTH = 16385;
+ final int RANDOM_SEED = 23453;
+ FSDataInputStream fsIn = null;
+
+ DistributedFileSystem fs = null;
+ try {
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+ cluster.waitActive();
+ fs = cluster.getFileSystem();
+ DFSTestUtil.createFile(fs, TEST_PATH,
+ TEST_FILE_LENGTH, (short)1, RANDOM_SEED);
+ try {
+ DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
+ } catch (InterruptedException e) {
+ Assert.fail("unexpected InterruptedException during " +
+ "waitReplication: " + e);
+ } catch (TimeoutException e) {
+ Assert.fail("unexpected TimeoutException during " +
+ "waitReplication: " + e);
+ }
+ fsIn = fs.open(TEST_PATH);
+ byte original[] = new byte[TEST_FILE_LENGTH];
+ IOUtils.readFully(fsIn, original, 0, TEST_FILE_LENGTH);
+ fsIn.close();
+ fsIn = fs.open(TEST_PATH);
+ testFallbackImpl(fsIn, original);
+ } finally {
+ if (fsIn != null) fsIn.close();
+ if (fs != null) fs.close();
+ if (cluster != null) cluster.shutdown();
+ }
+ }
+
+ /**
+ * Test fallback reads on a stream which does not support the
+ * ByteBufferReadable * interface.
+ */
+ @Test
+ public void testIndirectFallbackReads() throws Exception {
+ final File TEST_DIR = new File(
+ System.getProperty("test.build.data","build/test/data"));
+ final String TEST_PATH = TEST_DIR + File.separator +
+ "indirectFallbackTestFile";
+ final int TEST_FILE_LENGTH = 16385;
+ final int RANDOM_SEED = 23453;
+ FileOutputStream fos = null;
+ FileInputStream fis = null;
+ try {
+ fos = new FileOutputStream(TEST_PATH);
+ Random random = new Random(RANDOM_SEED);
+ byte original[] = new byte[TEST_FILE_LENGTH];
+ random.nextBytes(original);
+ fos.write(original);
+ fos.close();
+ fos = null;
+ fis = new FileInputStream(TEST_PATH);
+ testFallbackImpl(fis, original);
+ } finally {
+ IOUtils.cleanup(LOG, fos, fis);
+ new File(TEST_PATH).delete();
+ }
+ }
+}