You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cm...@apache.org on 2014/02/01 03:31:15 UTC
svn commit: r1563363 - in
/hadoop/common/branches/branch-2/hadoop-common-project: ./ hadoop-common/
hadoop-common/src/ hadoop-common/src/main/java/
hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/
hadoop-common/src/main/java/org/apache/hadoop...
Author: cmccabe
Date: Sat Feb 1 02:31:14 2014
New Revision: 1563363
URL: http://svn.apache.org/r1563363
Log:
HDFS-5746. Add ShortCircuitSharedMemorySegment (cmccabe)
Added:
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/SharedFileDescriptorFactory.java
- copied unchanged from r1563362, hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/SharedFileDescriptorFactory.java
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocketWatcher.java
- copied unchanged from r1563362, hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocketWatcher.java
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/CloseableReferenceCount.java
- copied unchanged from r1563362, hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/CloseableReferenceCount.java
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/SharedFileDescriptorFactory.c
- copied unchanged from r1563362, hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/SharedFileDescriptorFactory.c
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/net/unix/DomainSocketWatcher.c
- copied unchanged from r1563362, hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/net/unix/DomainSocketWatcher.c
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/nativeio/TestSharedFileDescriptorFactory.java
- copied unchanged from r1563362, hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/nativeio/TestSharedFileDescriptorFactory.java
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/unix/TestDomainSocketWatcher.java
- copied unchanged from r1563362, hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/unix/TestDomainSocketWatcher.java
Modified:
hadoop/common/branches/branch-2/hadoop-common-project/ (props changed)
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/ (props changed)
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/pom.xml
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/ (props changed)
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/CMakeLists.txt
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/ (props changed)
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocket.java
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/NativeIO.c
Propchange: hadoop/common/branches/branch-2/hadoop-common-project/
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-common-project:r1563362
Propchange: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-common-project/hadoop-common:r1563362
Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/pom.xml?rev=1563363&r1=1563362&r2=1563363&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/pom.xml (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/pom.xml Sat Feb 1 02:31:14 2014
@@ -545,6 +545,7 @@
<javahClassName>org.apache.hadoop.io.compress.bzip2.Bzip2Decompressor</javahClassName>
<javahClassName>org.apache.hadoop.security.JniBasedUnixGroupsMapping</javahClassName>
<javahClassName>org.apache.hadoop.io.nativeio.NativeIO</javahClassName>
+ <javahClassName>org.apache.hadoop.io.nativeio.SharedFileDescriptorFactory</javahClassName>
<javahClassName>org.apache.hadoop.security.JniBasedUnixGroupsNetgroupMapping</javahClassName>
<javahClassName>org.apache.hadoop.io.compress.snappy.SnappyCompressor</javahClassName>
<javahClassName>org.apache.hadoop.io.compress.snappy.SnappyDecompressor</javahClassName>
@@ -552,6 +553,7 @@
<javahClassName>org.apache.hadoop.io.compress.lz4.Lz4Decompressor</javahClassName>
<javahClassName>org.apache.hadoop.util.NativeCrc32</javahClassName>
<javahClassName>org.apache.hadoop.net.unix.DomainSocket</javahClassName>
+ <javahClassName>org.apache.hadoop.net.unix.DomainSocketWatcher</javahClassName>
</javahClassNames>
<javahOutputDirectory>${project.build.directory}/native/javah</javahOutputDirectory>
</configuration>
Propchange: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-common-project/hadoop-common/src:r1563362
Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/CMakeLists.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/CMakeLists.txt?rev=1563363&r1=1563362&r2=1563363&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/CMakeLists.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/CMakeLists.txt Sat Feb 1 02:31:14 2014
@@ -178,7 +178,9 @@ add_dual_library(hadoop
${D}/io/nativeio/NativeIO.c
${D}/io/nativeio/errno_enum.c
${D}/io/nativeio/file_descriptor.c
+ ${D}/io/nativeio/SharedFileDescriptorFactory.c
${D}/net/unix/DomainSocket.c
+ ${D}/net/unix/DomainSocketWatcher.c
${D}/security/JniBasedUnixGroupsMapping.c
${D}/security/JniBasedUnixGroupsNetgroupMapping.c
${D}/security/hadoop_group_info.c
Propchange: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java:r1563362
Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java?rev=1563363&r1=1563362&r2=1563363&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java Sat Feb 1 02:31:14 2014
@@ -487,6 +487,16 @@ public class NativeIO {
new ConcurrentHashMap<Integer, CachedName>();
private enum IdCache { USER, GROUP }
+
+ public final static int MMAP_PROT_READ = 0x1;
+ public final static int MMAP_PROT_WRITE = 0x2;
+ public final static int MMAP_PROT_EXEC = 0x4;
+
+ public static native long mmap(FileDescriptor fd, int prot,
+ boolean shared, long length) throws IOException;
+
+ public static native void munmap(long addr, long length)
+ throws IOException;
}
private static boolean workaroundNonThreadSafePasswdCalls = false;
Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocket.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocket.java?rev=1563363&r1=1563362&r2=1563363&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocket.java (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocket.java Sat Feb 1 02:31:14 2014
@@ -24,17 +24,15 @@ import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import java.net.SocketException;
-import java.nio.channels.AsynchronousCloseException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.ReadableByteChannel;
import java.nio.ByteBuffer;
-import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang.SystemUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.util.NativeCodeLoader;
+import org.apache.hadoop.util.CloseableReferenceCount;
import com.google.common.annotations.VisibleForTesting;
@@ -132,104 +130,14 @@ public class DomainSocket implements Clo
}
/**
- * Tracks the reference count of the file descriptor, and also whether it is
- * open or closed.
+ * The socket reference count and closed bit.
*/
- private static class Status {
- /**
- * Bit mask representing a closed domain socket.
- */
- private static final int STATUS_CLOSED_MASK = 1 << 30;
-
- /**
- * Status bits
- *
- * Bit 30: 0 = DomainSocket open, 1 = DomainSocket closed
- * Bits 29 to 0: the reference count.
- */
- private final AtomicInteger bits = new AtomicInteger(0);
-
- Status() { }
-
- /**
- * Increment the reference count of the underlying file descriptor.
- *
- * @throws ClosedChannelException If the file descriptor is closed.
- */
- void reference() throws ClosedChannelException {
- int curBits = bits.incrementAndGet();
- if ((curBits & STATUS_CLOSED_MASK) != 0) {
- bits.decrementAndGet();
- throw new ClosedChannelException();
- }
- }
-
- /**
- * Decrement the reference count of the underlying file descriptor.
- *
- * @param checkClosed Whether to throw an exception if the file
- * descriptor is closed.
- *
- * @throws AsynchronousCloseException If the file descriptor is closed and
- * checkClosed is set.
- */
- void unreference(boolean checkClosed) throws AsynchronousCloseException {
- int newCount = bits.decrementAndGet();
- assert (newCount & ~STATUS_CLOSED_MASK) >= 0;
- if (checkClosed && ((newCount & STATUS_CLOSED_MASK) != 0)) {
- throw new AsynchronousCloseException();
- }
- }
-
- /**
- * Return true if the file descriptor is currently open.
- *
- * @return True if the file descriptor is currently open.
- */
- boolean isOpen() {
- return ((bits.get() & STATUS_CLOSED_MASK) == 0);
- }
-
- /**
- * Mark the file descriptor as closed.
- *
- * Once the file descriptor is closed, it cannot be reopened.
- *
- * @return The current reference count.
- * @throws ClosedChannelException If someone else closes the file
- * descriptor before we do.
- */
- int setClosed() throws ClosedChannelException {
- while (true) {
- int curBits = bits.get();
- if ((curBits & STATUS_CLOSED_MASK) != 0) {
- throw new ClosedChannelException();
- }
- if (bits.compareAndSet(curBits, curBits | STATUS_CLOSED_MASK)) {
- return curBits & (~STATUS_CLOSED_MASK);
- }
- }
- }
-
- /**
- * Get the current reference count.
- *
- * @return The current reference count.
- */
- int getReferenceCount() {
- return bits.get() & (~STATUS_CLOSED_MASK);
- }
- }
-
- /**
- * The socket status.
- */
- private final Status status;
+ final CloseableReferenceCount refCount;
/**
* The file descriptor associated with this UNIX domain socket.
*/
- private final int fd;
+ final int fd;
/**
* The path associated with this UNIX domain socket.
@@ -252,13 +160,21 @@ public class DomainSocket implements Clo
private final DomainChannel channel = new DomainChannel();
private DomainSocket(String path, int fd) {
- this.status = new Status();
+ this.refCount = new CloseableReferenceCount();
this.fd = fd;
this.path = path;
}
private static native int bind0(String path) throws IOException;
+ private void unreference(boolean checkClosed) throws ClosedChannelException {
+ if (checkClosed) {
+ refCount.unreferenceCheckClosed();
+ } else {
+ refCount.unreference();
+ }
+ }
+
/**
* Create a new DomainSocket listening on the given path.
*
@@ -308,14 +224,14 @@ public class DomainSocket implements Clo
* @throws SocketTimeoutException If the accept timed out.
*/
public DomainSocket accept() throws IOException {
- status.reference();
+ refCount.reference();
boolean exc = true;
try {
DomainSocket ret = new DomainSocket(path, accept0(fd));
exc = false;
return ret;
} finally {
- status.unreference(exc);
+ unreference(exc);
}
}
@@ -335,14 +251,14 @@ public class DomainSocket implements Clo
return new DomainSocket(path, fd);
}
- /**
- * Return true if the file descriptor is currently open.
- *
- * @return True if the file descriptor is currently open.
- */
- public boolean isOpen() {
- return status.isOpen();
- }
+ /**
+ * Return true if the file descriptor is currently open.
+ *
+ * @return True if the file descriptor is currently open.
+ */
+ public boolean isOpen() {
+ return refCount.isOpen();
+ }
/**
* @return The socket path.
@@ -381,20 +297,20 @@ public class DomainSocket implements Clo
throws IOException;
public void setAttribute(int type, int size) throws IOException {
- status.reference();
+ refCount.reference();
boolean exc = true;
try {
setAttribute0(fd, type, size);
exc = false;
} finally {
- status.unreference(exc);
+ unreference(exc);
}
}
private native int getAttribute0(int fd, int type) throws IOException;
public int getAttribute(int type) throws IOException {
- status.reference();
+ refCount.reference();
int attribute;
boolean exc = true;
try {
@@ -402,7 +318,7 @@ public class DomainSocket implements Clo
exc = false;
return attribute;
} finally {
- status.unreference(exc);
+ unreference(exc);
}
}
@@ -419,9 +335,9 @@ public class DomainSocket implements Clo
@Override
public void close() throws IOException {
// Set the closed bit on this DomainSocket
- int refCount;
+ int count;
try {
- refCount = status.setClosed();
+ count = refCount.setClosed();
} catch (ClosedChannelException e) {
// Someone else already closed the DomainSocket.
return;
@@ -429,7 +345,7 @@ public class DomainSocket implements Clo
// Wait for all references to go away
boolean didShutdown = false;
boolean interrupted = false;
- while (refCount > 0) {
+ while (count > 0) {
if (!didShutdown) {
try {
// Calling shutdown on the socket will interrupt blocking system
@@ -446,7 +362,7 @@ public class DomainSocket implements Clo
} catch (InterruptedException e) {
interrupted = true;
}
- refCount = status.getReferenceCount();
+ count = refCount.getReferenceCount();
}
// At this point, nobody has a reference to the file descriptor,
@@ -478,13 +394,13 @@ public class DomainSocket implements Clo
*/
public void sendFileDescriptors(FileDescriptor descriptors[],
byte jbuf[], int offset, int length) throws IOException {
- status.reference();
+ refCount.reference();
boolean exc = true;
try {
sendFileDescriptors0(fd, descriptors, jbuf, offset, length);
exc = false;
} finally {
- status.unreference(exc);
+ unreference(exc);
}
}
@@ -515,14 +431,14 @@ public class DomainSocket implements Clo
*/
public int receiveFileDescriptors(FileDescriptor[] descriptors,
byte jbuf[], int offset, int length) throws IOException {
- status.reference();
+ refCount.reference();
boolean exc = true;
try {
int nBytes = receiveFileDescriptors0(fd, descriptors, jbuf, offset, length);
exc = false;
return nBytes;
} finally {
- status.unreference(exc);
+ unreference(exc);
}
}
@@ -539,7 +455,7 @@ public class DomainSocket implements Clo
for (int i = 0; i < streams.length; i++) {
streams[i] = null;
}
- status.reference();
+ refCount.reference();
try {
int ret = receiveFileDescriptors0(fd, descriptors, buf, offset, length);
for (int i = 0, j = 0; i < descriptors.length; i++) {
@@ -569,7 +485,7 @@ public class DomainSocket implements Clo
}
}
}
- status.unreference(!success);
+ unreference(!success);
}
}
@@ -593,7 +509,7 @@ public class DomainSocket implements Clo
public class DomainInputStream extends InputStream {
@Override
public int read() throws IOException {
- status.reference();
+ refCount.reference();
boolean exc = true;
try {
byte b[] = new byte[1];
@@ -601,33 +517,33 @@ public class DomainSocket implements Clo
exc = false;
return (ret >= 0) ? b[0] : -1;
} finally {
- status.unreference(exc);
+ unreference(exc);
}
}
@Override
public int read(byte b[], int off, int len) throws IOException {
- status.reference();
+ refCount.reference();
boolean exc = true;
try {
int nRead = DomainSocket.readArray0(DomainSocket.this.fd, b, off, len);
exc = false;
return nRead;
} finally {
- status.unreference(exc);
+ unreference(exc);
}
}
@Override
public int available() throws IOException {
- status.reference();
+ refCount.reference();
boolean exc = true;
try {
int nAvailable = DomainSocket.available0(DomainSocket.this.fd);
exc = false;
return nAvailable;
} finally {
- status.unreference(exc);
+ unreference(exc);
}
}
@@ -649,7 +565,7 @@ public class DomainSocket implements Clo
@Override
public void write(int val) throws IOException {
- status.reference();
+ refCount.reference();
boolean exc = true;
try {
byte b[] = new byte[1];
@@ -657,19 +573,19 @@ public class DomainSocket implements Clo
DomainSocket.writeArray0(DomainSocket.this.fd, b, 0, 1);
exc = false;
} finally {
- status.unreference(exc);
+ unreference(exc);
}
}
@Override
public void write(byte[] b, int off, int len) throws IOException {
- status.reference();
- boolean exc = true;
+ refCount.reference();
+ boolean exc = true;
try {
DomainSocket.writeArray0(DomainSocket.this.fd, b, off, len);
exc = false;
} finally {
- status.unreference(exc);
+ unreference(exc);
}
}
}
@@ -688,7 +604,7 @@ public class DomainSocket implements Clo
@Override
public int read(ByteBuffer dst) throws IOException {
- status.reference();
+ refCount.reference();
boolean exc = true;
try {
int nread = 0;
@@ -710,7 +626,7 @@ public class DomainSocket implements Clo
exc = false;
return nread;
} finally {
- status.unreference(exc);
+ unreference(exc);
}
}
}
Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/NativeIO.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/NativeIO.c?rev=1563363&r1=1563362&r2=1563363&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/NativeIO.c (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/NativeIO.c Sat Feb 1 02:31:14 2014
@@ -18,6 +18,7 @@
#include "org_apache_hadoop.h"
#include "org_apache_hadoop_io_nativeio_NativeIO.h"
+#include "org_apache_hadoop_io_nativeio_NativeIO_POSIX.h"
#ifdef UNIX
#include <assert.h>
@@ -49,6 +50,10 @@
#include "file_descriptor.h"
#include "errno_enum.h"
+#define MMAP_PROT_READ org_apache_hadoop_io_nativeio_NativeIO_POSIX_MMAP_PROT_READ
+#define MMAP_PROT_WRITE org_apache_hadoop_io_nativeio_NativeIO_POSIX_MMAP_PROT_WRITE
+#define MMAP_PROT_EXEC org_apache_hadoop_io_nativeio_NativeIO_POSIX_MMAP_PROT_EXEC
+
// the NativeIO$POSIX$Stat inner class and its constructor
static jclass stat_clazz;
static jmethodID stat_ctor;
@@ -661,6 +666,39 @@ cleanup:
#endif
}
+JNIEXPORT jlong JNICALL
+Java_org_apache_hadoop_io_nativeio_NativeIO_00024POSIX_mmap(
+ JNIEnv *env, jclass clazz, jobject jfd, jint jprot,
+ jboolean jshared, jlong length)
+{
+ void *addr = 0;
+ int prot, flags, fd;
+
+ prot = ((jprot & MMAP_PROT_READ) ? PROT_READ : 0) |
+ ((jprot & MMAP_PROT_WRITE) ? PROT_WRITE : 0) |
+ ((jprot & MMAP_PROT_EXEC) ? PROT_EXEC : 0);
+ flags = (jshared == JNI_TRUE) ? MAP_SHARED : MAP_PRIVATE;
+ fd = fd_get(env, jfd);
+ addr = mmap(NULL, length, prot, flags, fd, 0);
+ if (addr == MAP_FAILED) {
+ throw_ioe(env, errno);
+ }
+ return (jlong)(intptr_t)addr;
+}
+
+JNIEXPORT void JNICALL
+Java_org_apache_hadoop_io_nativeio_NativeIO_00024POSIX_munmap(
+ JNIEnv *env, jclass clazz, jlong jaddr, jlong length)
+{
+ void *addr;
+
+ addr = (void*)(intptr_t)jaddr;
+ if (munmap(addr, length) < 0) {
+ throw_ioe(env, errno);
+ }
+}
+
+
/*
* static native String getGroupName(int gid);
*