You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cm...@apache.org on 2014/02/01 03:25:34 UTC
svn commit: r1563362 - in
/hadoop/common/trunk/hadoop-common-project/hadoop-common: ./ src/
src/main/java/org/apache/hadoop/io/nativeio/
src/main/java/org/apache/hadoop/net/unix/
src/main/java/org/apache/hadoop/util/ src/main/native/src/org/apache/hado...
Author: cmccabe
Date: Sat Feb 1 02:25:33 2014
New Revision: 1563362
URL: http://svn.apache.org/r1563362
Log:
HDFS-5746. Add ShortCircuitSharedMemorySegment (cmccabe)
Added:
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/SharedFileDescriptorFactory.java
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocketWatcher.java
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/CloseableReferenceCount.java
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/SharedFileDescriptorFactory.c
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/net/unix/DomainSocketWatcher.c
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/nativeio/TestSharedFileDescriptorFactory.java
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/unix/TestDomainSocketWatcher.java
Modified:
hadoop/common/trunk/hadoop-common-project/hadoop-common/pom.xml
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/CMakeLists.txt
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocket.java
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/NativeIO.c
Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/pom.xml?rev=1563362&r1=1563361&r2=1563362&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/pom.xml (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/pom.xml Sat Feb 1 02:25:33 2014
@@ -543,6 +543,7 @@
<javahClassName>org.apache.hadoop.io.compress.bzip2.Bzip2Decompressor</javahClassName>
<javahClassName>org.apache.hadoop.security.JniBasedUnixGroupsMapping</javahClassName>
<javahClassName>org.apache.hadoop.io.nativeio.NativeIO</javahClassName>
+ <javahClassName>org.apache.hadoop.io.nativeio.SharedFileDescriptorFactory</javahClassName>
<javahClassName>org.apache.hadoop.security.JniBasedUnixGroupsNetgroupMapping</javahClassName>
<javahClassName>org.apache.hadoop.io.compress.snappy.SnappyCompressor</javahClassName>
<javahClassName>org.apache.hadoop.io.compress.snappy.SnappyDecompressor</javahClassName>
@@ -550,6 +551,7 @@
<javahClassName>org.apache.hadoop.io.compress.lz4.Lz4Decompressor</javahClassName>
<javahClassName>org.apache.hadoop.util.NativeCrc32</javahClassName>
<javahClassName>org.apache.hadoop.net.unix.DomainSocket</javahClassName>
+ <javahClassName>org.apache.hadoop.net.unix.DomainSocketWatcher</javahClassName>
</javahClassNames>
<javahOutputDirectory>${project.build.directory}/native/javah</javahOutputDirectory>
</configuration>
Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/CMakeLists.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/CMakeLists.txt?rev=1563362&r1=1563361&r2=1563362&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/CMakeLists.txt (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/CMakeLists.txt Sat Feb 1 02:25:33 2014
@@ -178,7 +178,9 @@ add_dual_library(hadoop
${D}/io/nativeio/NativeIO.c
${D}/io/nativeio/errno_enum.c
${D}/io/nativeio/file_descriptor.c
+ ${D}/io/nativeio/SharedFileDescriptorFactory.c
${D}/net/unix/DomainSocket.c
+ ${D}/net/unix/DomainSocketWatcher.c
${D}/security/JniBasedUnixGroupsMapping.c
${D}/security/JniBasedUnixGroupsNetgroupMapping.c
${D}/security/hadoop_group_info.c
Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java?rev=1563362&r1=1563361&r2=1563362&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java Sat Feb 1 02:25:33 2014
@@ -487,6 +487,16 @@ public class NativeIO {
new ConcurrentHashMap<Integer, CachedName>();
private enum IdCache { USER, GROUP }
+
+ public final static int MMAP_PROT_READ = 0x1;
+ public final static int MMAP_PROT_WRITE = 0x2;
+ public final static int MMAP_PROT_EXEC = 0x4;
+
+ public static native long mmap(FileDescriptor fd, int prot,
+ boolean shared, long length) throws IOException;
+
+ public static native void munmap(long addr, long length)
+ throws IOException;
}
private static boolean workaroundNonThreadSafePasswdCalls = false;
Added: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/SharedFileDescriptorFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/SharedFileDescriptorFactory.java?rev=1563362&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/SharedFileDescriptorFactory.java (added)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/SharedFileDescriptorFactory.java Sat Feb 1 02:25:33 2014
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io.nativeio;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.FileDescriptor;
+
+import org.apache.commons.lang.SystemUtils;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * A factory for creating shared file descriptors inside a given directory.
+ * Typically, the directory will be /dev/shm or /tmp.
+ *
+ * We will hand out file descriptors that correspond to unlinked files residing
+ * in that directory. These file descriptors are suitable for sharing across
+ * multiple processes and are both readable and writable.
+ *
+ * Because we unlink the temporary files right after creating them, a JVM crash
+ * usually does not leave behind any temporary files in the directory. However,
+ * it may happen that we crash right after creating the file and before
+ * unlinking it. In the constructor, we attempt to clean up after any such
+ * remnants by trying to unlink any temporary files created by previous
+ * SharedFileDescriptorFactory instances that also used our prefix.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class SharedFileDescriptorFactory {
+ private final String prefix;
+ private final String path;
+
+ /**
+ * Create a SharedFileDescriptorFactory.
+ *
+ * @param prefix Prefix to add to all file names we use.
+ * @param path Path to use.
+ */
+ public SharedFileDescriptorFactory(String prefix, String path)
+ throws IOException {
+ Preconditions.checkArgument(NativeIO.isAvailable());
+ Preconditions.checkArgument(SystemUtils.IS_OS_UNIX);
+ this.prefix = prefix;
+ this.path = path;
+ deleteStaleTemporaryFiles0(prefix, path);
+ }
+
+ /**
+ * Create a shared file descriptor which will be both readable and writable.
+ *
+ * @param length The starting file length.
+ *
+ * @return The file descriptor, wrapped in a FileInputStream.
+ * @throws IOException If there was an I/O or configuration error creating
+ * the descriptor.
+ */
+ public FileInputStream createDescriptor(int length) throws IOException {
+ return new FileInputStream(createDescriptor0(prefix, path, length));
+ }
+
+ /**
+ * Delete temporary files in the directory, NOT following symlinks.
+ */
+ private static native void deleteStaleTemporaryFiles0(String prefix,
+ String path) throws IOException;
+
+ /**
+ * Create a file with O_EXCL, and then resize it to the desired size.
+ */
+ private static native FileDescriptor createDescriptor0(String prefix,
+ String path, int length) throws IOException;
+}
Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocket.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocket.java?rev=1563362&r1=1563361&r2=1563362&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocket.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocket.java Sat Feb 1 02:25:33 2014
@@ -24,17 +24,15 @@ import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import java.net.SocketException;
-import java.nio.channels.AsynchronousCloseException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.ReadableByteChannel;
import java.nio.ByteBuffer;
-import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang.SystemUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.util.NativeCodeLoader;
+import org.apache.hadoop.util.CloseableReferenceCount;
import com.google.common.annotations.VisibleForTesting;
@@ -132,104 +130,14 @@ public class DomainSocket implements Clo
}
/**
- * Tracks the reference count of the file descriptor, and also whether it is
- * open or closed.
+ * The socket reference count and closed bit.
*/
- private static class Status {
- /**
- * Bit mask representing a closed domain socket.
- */
- private static final int STATUS_CLOSED_MASK = 1 << 30;
-
- /**
- * Status bits
- *
- * Bit 30: 0 = DomainSocket open, 1 = DomainSocket closed
- * Bits 29 to 0: the reference count.
- */
- private final AtomicInteger bits = new AtomicInteger(0);
-
- Status() { }
-
- /**
- * Increment the reference count of the underlying file descriptor.
- *
- * @throws ClosedChannelException If the file descriptor is closed.
- */
- void reference() throws ClosedChannelException {
- int curBits = bits.incrementAndGet();
- if ((curBits & STATUS_CLOSED_MASK) != 0) {
- bits.decrementAndGet();
- throw new ClosedChannelException();
- }
- }
-
- /**
- * Decrement the reference count of the underlying file descriptor.
- *
- * @param checkClosed Whether to throw an exception if the file
- * descriptor is closed.
- *
- * @throws AsynchronousCloseException If the file descriptor is closed and
- * checkClosed is set.
- */
- void unreference(boolean checkClosed) throws AsynchronousCloseException {
- int newCount = bits.decrementAndGet();
- assert (newCount & ~STATUS_CLOSED_MASK) >= 0;
- if (checkClosed && ((newCount & STATUS_CLOSED_MASK) != 0)) {
- throw new AsynchronousCloseException();
- }
- }
-
- /**
- * Return true if the file descriptor is currently open.
- *
- * @return True if the file descriptor is currently open.
- */
- boolean isOpen() {
- return ((bits.get() & STATUS_CLOSED_MASK) == 0);
- }
-
- /**
- * Mark the file descriptor as closed.
- *
- * Once the file descriptor is closed, it cannot be reopened.
- *
- * @return The current reference count.
- * @throws ClosedChannelException If someone else closes the file
- * descriptor before we do.
- */
- int setClosed() throws ClosedChannelException {
- while (true) {
- int curBits = bits.get();
- if ((curBits & STATUS_CLOSED_MASK) != 0) {
- throw new ClosedChannelException();
- }
- if (bits.compareAndSet(curBits, curBits | STATUS_CLOSED_MASK)) {
- return curBits & (~STATUS_CLOSED_MASK);
- }
- }
- }
-
- /**
- * Get the current reference count.
- *
- * @return The current reference count.
- */
- int getReferenceCount() {
- return bits.get() & (~STATUS_CLOSED_MASK);
- }
- }
-
- /**
- * The socket status.
- */
- private final Status status;
+ final CloseableReferenceCount refCount;
/**
* The file descriptor associated with this UNIX domain socket.
*/
- private final int fd;
+ final int fd;
/**
* The path associated with this UNIX domain socket.
@@ -252,13 +160,21 @@ public class DomainSocket implements Clo
private final DomainChannel channel = new DomainChannel();
private DomainSocket(String path, int fd) {
- this.status = new Status();
+ this.refCount = new CloseableReferenceCount();
this.fd = fd;
this.path = path;
}
private static native int bind0(String path) throws IOException;
+ private void unreference(boolean checkClosed) throws ClosedChannelException {
+ if (checkClosed) {
+ refCount.unreferenceCheckClosed();
+ } else {
+ refCount.unreference();
+ }
+ }
+
/**
* Create a new DomainSocket listening on the given path.
*
@@ -308,14 +224,14 @@ public class DomainSocket implements Clo
* @throws SocketTimeoutException If the accept timed out.
*/
public DomainSocket accept() throws IOException {
- status.reference();
+ refCount.reference();
boolean exc = true;
try {
DomainSocket ret = new DomainSocket(path, accept0(fd));
exc = false;
return ret;
} finally {
- status.unreference(exc);
+ unreference(exc);
}
}
@@ -335,14 +251,14 @@ public class DomainSocket implements Clo
return new DomainSocket(path, fd);
}
- /**
- * Return true if the file descriptor is currently open.
- *
- * @return True if the file descriptor is currently open.
- */
- public boolean isOpen() {
- return status.isOpen();
- }
+ /**
+ * Return true if the file descriptor is currently open.
+ *
+ * @return True if the file descriptor is currently open.
+ */
+ public boolean isOpen() {
+ return refCount.isOpen();
+ }
/**
* @return The socket path.
@@ -381,20 +297,20 @@ public class DomainSocket implements Clo
throws IOException;
public void setAttribute(int type, int size) throws IOException {
- status.reference();
+ refCount.reference();
boolean exc = true;
try {
setAttribute0(fd, type, size);
exc = false;
} finally {
- status.unreference(exc);
+ unreference(exc);
}
}
private native int getAttribute0(int fd, int type) throws IOException;
public int getAttribute(int type) throws IOException {
- status.reference();
+ refCount.reference();
int attribute;
boolean exc = true;
try {
@@ -402,7 +318,7 @@ public class DomainSocket implements Clo
exc = false;
return attribute;
} finally {
- status.unreference(exc);
+ unreference(exc);
}
}
@@ -419,9 +335,9 @@ public class DomainSocket implements Clo
@Override
public void close() throws IOException {
// Set the closed bit on this DomainSocket
- int refCount;
+ int count;
try {
- refCount = status.setClosed();
+ count = refCount.setClosed();
} catch (ClosedChannelException e) {
// Someone else already closed the DomainSocket.
return;
@@ -429,7 +345,7 @@ public class DomainSocket implements Clo
// Wait for all references to go away
boolean didShutdown = false;
boolean interrupted = false;
- while (refCount > 0) {
+ while (count > 0) {
if (!didShutdown) {
try {
// Calling shutdown on the socket will interrupt blocking system
@@ -446,7 +362,7 @@ public class DomainSocket implements Clo
} catch (InterruptedException e) {
interrupted = true;
}
- refCount = status.getReferenceCount();
+ count = refCount.getReferenceCount();
}
// At this point, nobody has a reference to the file descriptor,
@@ -478,13 +394,13 @@ public class DomainSocket implements Clo
*/
public void sendFileDescriptors(FileDescriptor descriptors[],
byte jbuf[], int offset, int length) throws IOException {
- status.reference();
+ refCount.reference();
boolean exc = true;
try {
sendFileDescriptors0(fd, descriptors, jbuf, offset, length);
exc = false;
} finally {
- status.unreference(exc);
+ unreference(exc);
}
}
@@ -515,14 +431,14 @@ public class DomainSocket implements Clo
*/
public int receiveFileDescriptors(FileDescriptor[] descriptors,
byte jbuf[], int offset, int length) throws IOException {
- status.reference();
+ refCount.reference();
boolean exc = true;
try {
int nBytes = receiveFileDescriptors0(fd, descriptors, jbuf, offset, length);
exc = false;
return nBytes;
} finally {
- status.unreference(exc);
+ unreference(exc);
}
}
@@ -539,7 +455,7 @@ public class DomainSocket implements Clo
for (int i = 0; i < streams.length; i++) {
streams[i] = null;
}
- status.reference();
+ refCount.reference();
try {
int ret = receiveFileDescriptors0(fd, descriptors, buf, offset, length);
for (int i = 0, j = 0; i < descriptors.length; i++) {
@@ -569,7 +485,7 @@ public class DomainSocket implements Clo
}
}
}
- status.unreference(!success);
+ unreference(!success);
}
}
@@ -593,7 +509,7 @@ public class DomainSocket implements Clo
public class DomainInputStream extends InputStream {
@Override
public int read() throws IOException {
- status.reference();
+ refCount.reference();
boolean exc = true;
try {
byte b[] = new byte[1];
@@ -601,33 +517,33 @@ public class DomainSocket implements Clo
exc = false;
return (ret >= 0) ? b[0] : -1;
} finally {
- status.unreference(exc);
+ unreference(exc);
}
}
@Override
public int read(byte b[], int off, int len) throws IOException {
- status.reference();
+ refCount.reference();
boolean exc = true;
try {
int nRead = DomainSocket.readArray0(DomainSocket.this.fd, b, off, len);
exc = false;
return nRead;
} finally {
- status.unreference(exc);
+ unreference(exc);
}
}
@Override
public int available() throws IOException {
- status.reference();
+ refCount.reference();
boolean exc = true;
try {
int nAvailable = DomainSocket.available0(DomainSocket.this.fd);
exc = false;
return nAvailable;
} finally {
- status.unreference(exc);
+ unreference(exc);
}
}
@@ -649,7 +565,7 @@ public class DomainSocket implements Clo
@Override
public void write(int val) throws IOException {
- status.reference();
+ refCount.reference();
boolean exc = true;
try {
byte b[] = new byte[1];
@@ -657,19 +573,19 @@ public class DomainSocket implements Clo
DomainSocket.writeArray0(DomainSocket.this.fd, b, 0, 1);
exc = false;
} finally {
- status.unreference(exc);
+ unreference(exc);
}
}
@Override
public void write(byte[] b, int off, int len) throws IOException {
- status.reference();
- boolean exc = true;
+ refCount.reference();
+ boolean exc = true;
try {
DomainSocket.writeArray0(DomainSocket.this.fd, b, off, len);
exc = false;
} finally {
- status.unreference(exc);
+ unreference(exc);
}
}
}
@@ -688,7 +604,7 @@ public class DomainSocket implements Clo
@Override
public int read(ByteBuffer dst) throws IOException {
- status.reference();
+ refCount.reference();
boolean exc = true;
try {
int nread = 0;
@@ -710,7 +626,7 @@ public class DomainSocket implements Clo
exc = false;
return nread;
} finally {
- status.unreference(exc);
+ unreference(exc);
}
}
}
Added: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocketWatcher.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocketWatcher.java?rev=1563362&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocketWatcher.java (added)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocketWatcher.java Sat Feb 1 02:25:33 2014
@@ -0,0 +1,478 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.net.unix;
+
+import java.io.Closeable;
+import java.io.EOFException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.io.IOUtils;
+
+import java.io.IOException;
+import java.nio.channels.ClosedChannelException;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.TreeMap;
+import java.util.Map;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.commons.lang.SystemUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.util.NativeCodeLoader;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.Uninterruptibles;
+
+/**
+ * The DomainSocketWatcher watches a set of domain sockets to see when they
+ * become readable, or closed. When one of those events happens, it makes a
+ * callback.
+ *
+ * See {@link DomainSocket} for more information about UNIX domain sockets.
+ */
+@InterfaceAudience.LimitedPrivate("HDFS")
+public final class DomainSocketWatcher extends Thread implements Closeable {
+ static {
+ if (SystemUtils.IS_OS_WINDOWS) {
+ loadingFailureReason = "UNIX Domain sockets are not available on Windows.";
+ } else if (!NativeCodeLoader.isNativeCodeLoaded()) {
+ loadingFailureReason = "libhadoop cannot be loaded.";
+ } else {
+ String problem;
+ try {
+ anchorNative();
+ problem = null;
+ } catch (Throwable t) {
+ problem = "DomainSocketWatcher#anchorNative got error: " +
+ t.getMessage();
+ }
+ loadingFailureReason = problem;
+ }
+ }
+
+ static Log LOG = LogFactory.getLog(DomainSocketWatcher.class);
+
+ /**
+ * The reason why DomainSocketWatcher is not available, or null if it is
+ * available.
+ */
+ private final static String loadingFailureReason;
+
+ /**
+ * Initializes the native library code.
+ */
+ private static native void anchorNative();
+
+ interface Handler {
+ /**
+ * Handles an event on a socket. An event may be the socket becoming
+ * readable, or the remote end being closed.
+ *
+ * @param sock The socket that the event occurred on.
+ * @return Whether we should close the socket.
+ */
+ boolean handle(DomainSocket sock);
+ }
+
+ /**
+ * Handler for {DomainSocketWatcher#notificationSockets[1]}
+ */
+ private class NotificationHandler implements Handler {
+ public boolean handle(DomainSocket sock) {
+ try {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(this + ": NotificationHandler: doing a read on " +
+ sock.fd);
+ }
+ if (sock.getInputStream().read() == -1) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(this + ": NotificationHandler: got EOF on " + sock.fd);
+ }
+ throw new EOFException();
+ }
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(this + ": NotificationHandler: read succeeded on " +
+ sock.fd);
+ }
+ return false;
+ } catch (IOException e) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(this + ": NotificationHandler: setting closed to " +
+ "true for " + sock.fd);
+ }
+ closed = true;
+ return true;
+ }
+ }
+ }
+
+ private static class Entry {
+ final DomainSocket socket;
+ final Handler handler;
+
+ Entry(DomainSocket socket, Handler handler) {
+ this.socket = socket;
+ this.handler = handler;
+ }
+
+ DomainSocket getDomainSocket() {
+ return socket;
+ }
+
+ Handler getHandler() {
+ return handler;
+ }
+ }
+
+ /**
+ * The FdSet is a set of file descriptors that gets passed to poll(2).
+ * It contains a native memory segment, so that we don't have to copy
+ * in the poll0 function.
+ */
+ private static class FdSet {
+ private long data;
+
+ private native static long alloc0();
+
+ FdSet() {
+ data = alloc0();
+ }
+
+ /**
+ * Add a file descriptor to the set.
+ *
+ * @param fd The file descriptor to add.
+ */
+ native void add(int fd);
+
+ /**
+ * Remove a file descriptor from the set.
+ *
+ * @param fd The file descriptor to remove.
+ */
+ native void remove(int fd);
+
+ /**
+ * Get an array containing all the FDs marked as readable.
+ * Also clear the state of all FDs.
+ *
+ * @return An array containing all of the currently readable file
+ * descriptors.
+ */
+ native int[] getAndClearReadableFds();
+
+ /**
+ * Close the object and de-allocate the memory used.
+ */
+ native void close();
+ }
+
+ /**
+ * Lock which protects toAdd, toRemove, and closed.
+ */
+ private final ReentrantLock lock = new ReentrantLock();
+
+ /**
+ * Condition variable which indicates that toAdd and toRemove have been
+ * processed.
+ */
+ private final Condition processedCond = lock.newCondition();
+
+ /**
+ * Entries to add.
+ */
+ private final LinkedList<Entry> toAdd =
+ new LinkedList<Entry>();
+
+ /**
+ * Entries to remove.
+ */
+ private final TreeMap<Integer, DomainSocket> toRemove =
+ new TreeMap<Integer, DomainSocket>();
+
+ /**
+ * Maximum length of time to go between checking whether the interrupted
+ * bit has been set for this thread.
+ */
+ private final int interruptCheckPeriodMs;
+
+ /**
+ * A pair of sockets used to wake up the thread after it has called poll(2).
+ */
+ private final DomainSocket notificationSockets[];
+
+ /**
+ * Whether or not this DomainSocketWatcher is closed.
+ */
+ private boolean closed = false;
+
+ public DomainSocketWatcher(int interruptCheckPeriodMs) throws IOException {
+ if (loadingFailureReason != null) {
+ throw new UnsupportedOperationException(loadingFailureReason);
+ }
+ notificationSockets = DomainSocket.socketpair();
+ this.interruptCheckPeriodMs = interruptCheckPeriodMs;
+ Preconditions.checkArgument(interruptCheckPeriodMs > 0);
+ watcherThread.start();
+ }
+
+ /**
+ * Close the DomainSocketWatcher and wait for its thread to terminate.
+ *
+ * If there is more than one close, all but the first will be ignored.
+ */
+ @Override
+ public void close() throws IOException {
+ try {
+ lock.lock();
+ if (closed) return;
+ LOG.info(this + ": closing");
+ closed = true;
+ } finally {
+ lock.unlock();
+ }
+ // Close notificationSockets[0], so that notificationSockets[1] gets an EOF
+ // event. This will wake up the thread immediately if it is blocked inside
+ // the select() system call.
+ notificationSockets[0].close();
+ // Wait for the select thread to terminate.
+ Uninterruptibles.joinUninterruptibly(watcherThread);
+ }
+
+ /**
+ * Add a socket.
+ *
+ * @param sock The socket to add. It is an error to re-add a socket that
+ * we are already watching.
+ * @param handler The handler to associate with this socket. This may be
+ * called any time after this function is called.
+ */
+ public void add(DomainSocket sock, Handler handler) {
+ try {
+ lock.lock();
+ checkNotClosed();
+ Entry entry = new Entry(sock, handler);
+ try {
+ sock.refCount.reference();
+ } catch (ClosedChannelException e) {
+ Preconditions.checkArgument(false,
+ "tried to add a closed DomainSocket to " + this);
+ }
+ toAdd.add(entry);
+ kick();
+ while (true) {
+ try {
+ processedCond.await();
+ } catch (InterruptedException e) {
+ this.interrupt();
+ }
+ if (!toAdd.contains(entry)) {
+ break;
+ }
+ checkNotClosed();
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Remove a socket. Its handler will be called.
+ *
+ * @param sock The socket to remove.
+ */
+ public void remove(DomainSocket sock) {
+ try {
+ lock.lock();
+ checkNotClosed();
+ toRemove.put(sock.fd, sock);
+ kick();
+ while (true) {
+ try {
+ processedCond.await();
+ } catch (InterruptedException e) {
+ this.interrupt();
+ }
+ if (!toRemove.containsKey(sock.fd)) {
+ break;
+ }
+ checkNotClosed();
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Wake up the DomainSocketWatcher thread.
+ */
+ private void kick() {
+ try {
+ notificationSockets[0].getOutputStream().write(0);
+ } catch (IOException e) {
+ LOG.error(this + ": error writing to notificationSockets[0]", e);
+ }
+ }
+
+ /**
+ * Check that the DomainSocketWatcher is not closed.
+ * Must be called while holding the lock.
+ */
+ private void checkNotClosed() {
+ Preconditions.checkState(lock.isHeldByCurrentThread());
+ if (closed) {
+ throw new RuntimeException("DomainSocketWatcher is closed.");
+ }
+ }
+
+ private void sendCallback(String caller, TreeMap<Integer, Entry> entries,
+ FdSet fdSet, int fd) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(this + ": " + caller + " starting sendCallback for fd " + fd);
+ }
+ Entry entry = entries.get(fd);
+ Preconditions.checkNotNull(entry,
+ this + ": fdSet contained " + fd + ", which we were " +
+ "not tracking.");
+ DomainSocket sock = entry.getDomainSocket();
+ if (entry.getHandler().handle(sock)) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(this + ": " + caller + ": closing fd " + fd +
+ " at the request of the handler.");
+ }
+ if (toRemove.remove(fd) != null) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(this + ": " + caller + " : sendCallback processed fd " +
+ fd + " in toRemove.");
+ }
+ }
+ try {
+ sock.refCount.unreferenceCheckClosed();
+ } catch (IOException e) {
+ Preconditions.checkArgument(false,
+ this + ": file descriptor " + sock.fd + " was closed while " +
+ "still in the poll(2) loop.");
+ }
+ IOUtils.cleanup(LOG, sock);
+ entries.remove(fd);
+ fdSet.remove(fd);
+ } else {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(this + ": " + caller + ": sendCallback not " +
+ "closing fd " + fd);
+ }
+ }
+ }
+
+ private final Thread watcherThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ LOG.info(this + ": starting with interruptCheckPeriodMs = " +
+ interruptCheckPeriodMs);
+ final TreeMap<Integer, Entry> entries = new TreeMap<Integer, Entry>();
+ FdSet fdSet = new FdSet();
+ addNotificationSocket(entries, fdSet);
+ try {
+ while (true) {
+ lock.lock();
+ try {
+ for (int fd : fdSet.getAndClearReadableFds()) {
+ sendCallback("getAndClearReadableFds", entries, fdSet, fd);
+ }
+ if (!(toAdd.isEmpty() && toRemove.isEmpty())) {
+ // Handle pending additions (before pending removes).
+ for (Iterator<Entry> iter = toAdd.iterator(); iter.hasNext(); ) {
+ Entry entry = iter.next();
+ DomainSocket sock = entry.getDomainSocket();
+ Entry prevEntry = entries.put(sock.fd, entry);
+ Preconditions.checkState(prevEntry == null,
+ this + ": tried to watch a file descriptor that we " +
+ "were already watching: " + sock);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(this + ": adding fd " + sock.fd);
+ }
+ fdSet.add(sock.fd);
+ iter.remove();
+ }
+ // Handle pending removals
+ while (true) {
+ Map.Entry<Integer, DomainSocket> entry = toRemove.firstEntry();
+ if (entry == null) break;
+ sendCallback("handlePendingRemovals",
+ entries, fdSet, entry.getValue().fd);
+ }
+ processedCond.signalAll();
+ }
+ // Check if the thread should terminate. Doing this check now is
+ // easier than at the beginning of the loop, since we know toAdd and
+ // toRemove are now empty and processedCond has been notified if it
+ // needed to be.
+ if (closed) {
+ LOG.info(toString() + " thread terminating.");
+ return;
+ }
+ // Check if someone sent our thread an InterruptedException while we
+ // were waiting in poll().
+ if (Thread.interrupted()) {
+ throw new InterruptedException();
+ }
+ } finally {
+ lock.unlock();
+ }
+ doPoll0(interruptCheckPeriodMs, fdSet);
+ }
+ } catch (InterruptedException e) {
+ LOG.info(toString() + " terminating on InterruptedException");
+ } catch (IOException e) {
+ LOG.error(toString() + " terminating on IOException", e);
+ } finally {
+ for (Entry entry : entries.values()) {
+ sendCallback("close", entries, fdSet, entry.getDomainSocket().fd);
+ }
+ entries.clear();
+ fdSet.close();
+ }
+ }
+ });
+
+ private void addNotificationSocket(final TreeMap<Integer, Entry> entries,
+ FdSet fdSet) {
+ entries.put(notificationSockets[1].fd,
+ new Entry(notificationSockets[1], new NotificationHandler()));
+ try {
+ notificationSockets[1].refCount.reference();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ fdSet.add(notificationSockets[1].fd);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(this + ": adding notificationSocket " +
+ notificationSockets[1].fd + ", connected to " +
+ notificationSockets[0].fd);
+ }
+ }
+
+ public String toString() {
+ return "DomainSocketWatcher(" + System.identityHashCode(this) + ")";
+ }
+
+ private static native int doPoll0(int maxWaitMs, FdSet readFds)
+ throws IOException;
+}
Added: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/CloseableReferenceCount.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/CloseableReferenceCount.java?rev=1563362&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/CloseableReferenceCount.java (added)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/CloseableReferenceCount.java Sat Feb 1 02:25:33 2014
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.util;
+
+import java.nio.channels.AsynchronousCloseException;
+import java.nio.channels.ClosedChannelException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * A closeable object that maintains a reference count.
+ *
+ * Once the object is closed, attempting to take a new reference will throw
+ * ClosedChannelException.
+ */
+public class CloseableReferenceCount {
+ /**
+ * Bit mask representing a closed domain socket.
+ */
+ private static final int STATUS_CLOSED_MASK = 1 << 30;
+
+ /**
+ * The status bits.
+ *
+ * Bit 30: 0 = open, 1 = closed.
+ * Bits 29 to 0: the reference count.
+ */
+ private final AtomicInteger status = new AtomicInteger(0);
+
+ public CloseableReferenceCount() { }
+
+ /**
+ * Increment the reference count.
+ *
+ * @throws ClosedChannelException If the status is closed.
+ */
+ public void reference() throws ClosedChannelException {
+ int curBits = status.incrementAndGet();
+ if ((curBits & STATUS_CLOSED_MASK) != 0) {
+ status.decrementAndGet();
+ throw new ClosedChannelException();
+ }
+ }
+
+ /**
+ * Decrement the reference count.
+ *
+ * @return True if the object is closed and has no outstanding
+ * references.
+ */
+ public boolean unreference() {
+ int newVal = status.decrementAndGet();
+ Preconditions.checkState(newVal != 0xffffffff,
+ "called unreference when the reference count was already at 0.");
+ return newVal == STATUS_CLOSED_MASK;
+ }
+
+ /**
+ * Decrement the reference count, checking to make sure that the
+ * CloseableReferenceCount is not closed.
+ *
+ * @throws AsynchronousCloseException If the status is closed.
+ */
+ public void unreferenceCheckClosed() throws ClosedChannelException {
+ int newVal = status.decrementAndGet();
+ if ((newVal & STATUS_CLOSED_MASK) != 0) {
+ throw new AsynchronousCloseException();
+ }
+ }
+
+ /**
+ * Return true if the status is currently open.
+ *
+ * @return True if the status is currently open.
+ */
+ public boolean isOpen() {
+ return ((status.get() & STATUS_CLOSED_MASK) == 0);
+ }
+
+ /**
+ * Mark the status as closed.
+ *
+ * Once the status is closed, it cannot be reopened.
+ *
+ * @return The current reference count.
+ * @throws ClosedChannelException If someone else closes the object
+ * before we do.
+ */
+ public int setClosed() throws ClosedChannelException {
+ while (true) {
+ int curBits = status.get();
+ if ((curBits & STATUS_CLOSED_MASK) != 0) {
+ throw new ClosedChannelException();
+ }
+ if (status.compareAndSet(curBits, curBits | STATUS_CLOSED_MASK)) {
+ return curBits & (~STATUS_CLOSED_MASK);
+ }
+ }
+ }
+
+ /**
+ * Get the current reference count.
+ *
+ * @return The current reference count.
+ */
+ public int getReferenceCount() {
+ return status.get() & (~STATUS_CLOSED_MASK);
+ }
+}
Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/NativeIO.c
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/NativeIO.c?rev=1563362&r1=1563361&r2=1563362&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/NativeIO.c (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/NativeIO.c Sat Feb 1 02:25:33 2014
@@ -18,6 +18,7 @@
#include "org_apache_hadoop.h"
#include "org_apache_hadoop_io_nativeio_NativeIO.h"
+#include "org_apache_hadoop_io_nativeio_NativeIO_POSIX.h"
#ifdef UNIX
#include <assert.h>
@@ -49,6 +50,10 @@
#include "file_descriptor.h"
#include "errno_enum.h"
+#define MMAP_PROT_READ org_apache_hadoop_io_nativeio_NativeIO_POSIX_MMAP_PROT_READ
+#define MMAP_PROT_WRITE org_apache_hadoop_io_nativeio_NativeIO_POSIX_MMAP_PROT_WRITE
+#define MMAP_PROT_EXEC org_apache_hadoop_io_nativeio_NativeIO_POSIX_MMAP_PROT_EXEC
+
// the NativeIO$POSIX$Stat inner class and its constructor
static jclass stat_clazz;
static jmethodID stat_ctor;
@@ -661,6 +666,39 @@ cleanup:
#endif
}
+JNIEXPORT jlong JNICALL
+Java_org_apache_hadoop_io_nativeio_NativeIO_00024POSIX_mmap(
+ JNIEnv *env, jclass clazz, jobject jfd, jint jprot,
+ jboolean jshared, jlong length)
+{
+ void *addr = 0;
+ int prot, flags, fd;
+
+ prot = ((jprot & MMAP_PROT_READ) ? PROT_READ : 0) |
+ ((jprot & MMAP_PROT_WRITE) ? PROT_WRITE : 0) |
+ ((jprot & MMAP_PROT_EXEC) ? PROT_EXEC : 0);
+ flags = (jshared == JNI_TRUE) ? MAP_SHARED : MAP_PRIVATE;
+ fd = fd_get(env, jfd);
+ addr = mmap(NULL, length, prot, flags, fd, 0);
+ if (addr == MAP_FAILED) {
+ throw_ioe(env, errno);
+ }
+ return (jlong)(intptr_t)addr;
+}
+
+JNIEXPORT void JNICALL
+Java_org_apache_hadoop_io_nativeio_NativeIO_00024POSIX_munmap(
+ JNIEnv *env, jclass clazz, jlong jaddr, jlong length)
+{
+ void *addr;
+
+ addr = (void*)(intptr_t)jaddr;
+ if (munmap(addr, length) < 0) {
+ throw_ioe(env, errno);
+ }
+}
+
+
/*
* static native String getGroupName(int gid);
*
Added: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/SharedFileDescriptorFactory.c
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/SharedFileDescriptorFactory.c?rev=1563362&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/SharedFileDescriptorFactory.c (added)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/SharedFileDescriptorFactory.c Sat Feb 1 02:25:33 2014
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "org_apache_hadoop.h"
+
+#ifdef UNIX
+
+#include "exception.h"
+#include "file_descriptor.h"
+#include "org_apache_hadoop.h"
+#include "org_apache_hadoop_io_nativeio_SharedFileDescriptorFactory.h"
+
+#include <dirent.h>
+#include <errno.h>
+#include <fcntl.h>
+#include <limits.h>
+#include <pthread.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/stat.h>
+#include <sys/types.h>
+#include <unistd.h>
+
+static pthread_mutex_t g_rand_lock = PTHREAD_MUTEX_INITIALIZER;
+
+JNIEXPORT void JNICALL
+Java_org_apache_hadoop_io_nativeio_SharedFileDescriptorFactory_deleteStaleTemporaryFiles0(
+ JNIEnv *env, jclass clazz, jstring jprefix, jstring jpath)
+{
+ const char *prefix = NULL, *path = NULL;
+ char target[PATH_MAX];
+ jthrowable jthr;
+ DIR *dp = NULL;
+ struct dirent *de;
+
+ prefix = (*env)->GetStringUTFChars(env, jprefix, NULL);
+ if (!prefix) goto done; // exception raised
+ path = (*env)->GetStringUTFChars(env, jpath, NULL);
+ if (!path) goto done; // exception raised
+
+ dp = opendir(path);
+ if (!dp) {
+ int ret = errno;
+ jthr = newIOException(env, "opendir(%s) error %d: %s",
+ path, ret, terror(ret));
+ (*env)->Throw(env, jthr);
+ goto done;
+ }
+ while ((de = readdir(dp))) {
+ if (strncmp(prefix, de->d_name, strlen(prefix)) == 0) {
+ int ret = snprintf(target, PATH_MAX, "%s/%s", path, de->d_name);
+ if ((0 < ret) && (ret < PATH_MAX)) {
+ unlink(target);
+ }
+ }
+ }
+
+done:
+ if (dp) {
+ closedir(dp);
+ }
+ if (prefix) {
+ (*env)->ReleaseStringUTFChars(env, jprefix, prefix);
+ }
+ if (path) {
+ (*env)->ReleaseStringUTFChars(env, jpath, path);
+ }
+}
+
+JNIEXPORT jobject JNICALL
+Java_org_apache_hadoop_io_nativeio_SharedFileDescriptorFactory_createDescriptor0(
+ JNIEnv *env, jclass clazz, jstring jprefix, jstring jpath, jint length)
+{
+ const char *prefix = NULL, *path = NULL;
+ char target[PATH_MAX];
+ int ret, fd = -1, rnd;
+ jthrowable jthr;
+ jobject jret = NULL;
+
+ prefix = (*env)->GetStringUTFChars(env, jprefix, NULL);
+ if (!prefix) goto done; // exception raised
+ path = (*env)->GetStringUTFChars(env, jpath, NULL);
+ if (!path) goto done; // exception raised
+
+ pthread_mutex_lock(&g_rand_lock);
+ rnd = rand();
+ pthread_mutex_unlock(&g_rand_lock);
+ while (1) {
+ ret = snprintf(target, PATH_MAX, "%s/%s_%d",
+ path, prefix, rnd);
+ if (ret < 0) {
+ jthr = newIOException(env, "snprintf error");
+ (*env)->Throw(env, jthr);
+ goto done;
+ } else if (ret >= PATH_MAX) {
+ jthr = newIOException(env, "computed path was too long.");
+ (*env)->Throw(env, jthr);
+ goto done;
+ }
+ fd = open(target, O_CREAT | O_EXCL | O_RDWR, 0700);
+ if (fd >= 0) break; // success
+ ret = errno;
+ if (ret == EEXIST) {
+ // Bad luck -- we got a very rare collision here between us and
+ // another DataNode (or process). Try again.
+ continue;
+ } else if (ret == EINTR) {
+ // Most of the time, this error is only possible when opening FIFOs.
+ // But let's be thorough.
+ continue;
+ }
+ jthr = newIOException(env, "open(%s, O_CREAT | O_EXCL | O_RDWR) "
+ "failed: error %d (%s)", target, ret, terror(ret));
+ (*env)->Throw(env, jthr);
+ goto done;
+ }
+ if (unlink(target) < 0) {
+ jthr = newIOException(env, "unlink(%s) failed: error %d (%s)",
+ path, ret, terror(ret));
+ (*env)->Throw(env, jthr);
+ goto done;
+ }
+ if (ftruncate(fd, length) < 0) {
+ jthr = newIOException(env, "ftruncate(%s, %d) failed: error %d (%s)",
+ path, length, ret, terror(ret));
+ (*env)->Throw(env, jthr);
+ goto done;
+ }
+ jret = fd_create(env, fd); // throws exception on error.
+
+done:
+ if (prefix) {
+ (*env)->ReleaseStringUTFChars(env, jprefix, prefix);
+ }
+ if (path) {
+ (*env)->ReleaseStringUTFChars(env, jpath, path);
+ }
+ if (!jret) {
+ if (fd >= 0) {
+ close(fd);
+ }
+ }
+ return jret;
+}
+
+#endif
Added: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/net/unix/DomainSocketWatcher.c
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/net/unix/DomainSocketWatcher.c?rev=1563362&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/net/unix/DomainSocketWatcher.c (added)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/net/unix/DomainSocketWatcher.c Sat Feb 1 02:25:33 2014
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "config.h"
+#include "exception.h"
+#include "org_apache_hadoop.h"
+#include "org_apache_hadoop_net_unix_DomainSocketWatcher.h"
+
+#include <errno.h>
+#include <fcntl.h>
+#include <jni.h>
+#include <poll.h>
+#include <stdint.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/select.h>
+#include <sys/time.h>
+#include <sys/types.h>
+#include <unistd.h>
+
+static jfieldID fd_set_data_fid;
+
+#define FD_SET_DATA_MIN_SIZE 2
+
+struct fd_set_data {
+ /**
+ * Number of fds we have allocated space for.
+ */
+ int alloc_size;
+
+ /**
+ * Number of fds actually in use.
+ */
+ int used_size;
+
+ /**
+ * Beginning of pollfd data.
+ */
+ struct pollfd pollfd[0];
+};
+
+JNIEXPORT void JNICALL
+Java_org_apache_hadoop_net_unix_DomainSocketWatcher_anchorNative(
+JNIEnv *env, jclass clazz)
+{
+ jclass fd_set_class;
+
+ fd_set_class = (*env)->FindClass(env,
+ "org/apache/hadoop/net/unix/DomainSocketWatcher$FdSet");
+ if (!fd_set_class) return; // exception raised
+ fd_set_data_fid = (*env)->GetFieldID(env, fd_set_class, "data", "J");
+ if (!fd_set_data_fid) return; // exception raised
+}
+
+JNIEXPORT jlong JNICALL
+Java_org_apache_hadoop_net_unix_DomainSocketWatcher_00024FdSet_alloc0(
+JNIEnv *env, jclass clazz)
+{
+ struct fd_set_data *sd;
+
+ sd = calloc(1, sizeof(struct fd_set_data) +
+ (sizeof(struct pollfd) * FD_SET_DATA_MIN_SIZE));
+ if (!sd) {
+ (*env)->Throw(env, newRuntimeException(env, "out of memory allocating "
+ "DomainSocketWatcher#FdSet"));
+ return 0L;
+ }
+ sd->alloc_size = FD_SET_DATA_MIN_SIZE;
+ sd->used_size = 0;
+ return (jlong)(intptr_t)sd;
+}
+
+JNIEXPORT void JNICALL
+Java_org_apache_hadoop_net_unix_DomainSocketWatcher_00024FdSet_add(
+JNIEnv *env, jobject obj, jint fd)
+{
+ struct fd_set_data *sd, *nd;
+ struct pollfd *pollfd;
+
+ sd = (struct fd_set_data*)(intptr_t)(*env)->
+ GetLongField(env, obj, fd_set_data_fid);
+ if (sd->used_size + 1 > sd->alloc_size) {
+ nd = realloc(sd, sizeof(struct fd_set_data) +
+ (sizeof(struct pollfd) * sd->alloc_size * 2));
+ if (!nd) {
+ (*env)->Throw(env, newRuntimeException(env, "out of memory adding "
+ "another fd to DomainSocketWatcher#FdSet. we have %d already",
+ sd->alloc_size));
+ return;
+ }
+ nd->alloc_size = nd->alloc_size * 2;
+ (*env)->SetLongField(env, obj, fd_set_data_fid, (jlong)(intptr_t)nd);
+ sd = nd;
+ }
+ pollfd = &sd->pollfd[sd->used_size];
+ sd->used_size++;
+ pollfd->fd = fd;
+ pollfd->events = POLLIN;
+ pollfd->revents = 0;
+}
+
+JNIEXPORT void JNICALL
+Java_org_apache_hadoop_net_unix_DomainSocketWatcher_00024FdSet_remove(
+JNIEnv *env, jobject obj, jint fd)
+{
+ struct fd_set_data *sd;
+ struct pollfd *pollfd, *last_pollfd;
+ int used_size, i;
+
+ sd = (struct fd_set_data*)(intptr_t)(*env)->
+ GetLongField(env, obj, fd_set_data_fid);
+ used_size = sd->used_size;
+ for (i = 0; i < used_size; i++) {
+ pollfd = sd->pollfd + i;
+ if (pollfd->fd == fd) break;
+ }
+ if (i == used_size) {
+ (*env)->Throw(env, newRuntimeException(env, "failed to remove fd %d "
+ "from the FdSet because it was never present.", fd));
+ return;
+ }
+ last_pollfd = sd->pollfd + (used_size - 1);
+ if (used_size > 1) {
+ // Move last pollfd to the new empty slot if needed
+ pollfd->fd = last_pollfd->fd;
+ pollfd->events = last_pollfd->events;
+ pollfd->revents = last_pollfd->revents;
+ }
+ memset(last_pollfd, 0, sizeof(struct pollfd));
+ sd->used_size--;
+}
+
+JNIEXPORT jobject JNICALL
+Java_org_apache_hadoop_net_unix_DomainSocketWatcher_00024FdSet_getAndClearReadableFds(
+JNIEnv *env, jobject obj)
+{
+ int *carr = NULL;
+ jobject jarr = NULL;
+ struct fd_set_data *sd;
+ int used_size, num_readable = 0, i, j;
+ jthrowable jthr = NULL;
+
+ sd = (struct fd_set_data*)(intptr_t)(*env)->
+ GetLongField(env, obj, fd_set_data_fid);
+ used_size = sd->used_size;
+ for (i = 0; i < used_size; i++) {
+ if (sd->pollfd[i].revents & POLLIN) {
+ num_readable++;
+ } else {
+ sd->pollfd[i].revents = 0;
+ }
+ }
+ if (num_readable > 0) {
+ carr = malloc(sizeof(int) * num_readable);
+ if (!carr) {
+ jthr = newRuntimeException(env, "failed to allocate a temporary array "
+ "of %d ints", num_readable);
+ goto done;
+ }
+ j = 0;
+ for (i = 0; ((i < used_size) && (j < num_readable)); i++) {
+ if (sd->pollfd[i].revents & POLLIN) {
+ carr[j] = sd->pollfd[i].fd;
+ j++;
+ sd->pollfd[i].revents = 0;
+ }
+ }
+ if (j != num_readable) {
+ jthr = newRuntimeException(env, "failed to fill entire carr "
+ "array of size %d: only filled %d elements", num_readable, j);
+ goto done;
+ }
+ }
+ jarr = (*env)->NewIntArray(env, num_readable);
+ if (!jarr) {
+ jthr = (*env)->ExceptionOccurred(env);
+ (*env)->ExceptionClear(env);
+ goto done;
+ }
+ if (num_readable > 0) {
+ (*env)->SetIntArrayRegion(env, jarr, 0, num_readable, carr);
+ jthr = (*env)->ExceptionOccurred(env);
+ if (jthr) {
+ (*env)->ExceptionClear(env);
+ goto done;
+ }
+ }
+
+done:
+ free(carr);
+ if (jthr) {
+ (*env)->DeleteLocalRef(env, jarr);
+ jarr = NULL;
+ }
+ return jarr;
+}
+
+JNIEXPORT void JNICALL
+Java_org_apache_hadoop_net_unix_DomainSocketWatcher_00024FdSet_close(
+JNIEnv *env, jobject obj)
+{
+ struct fd_set_data *sd;
+
+ sd = (struct fd_set_data*)(intptr_t)(*env)->
+ GetLongField(env, obj, fd_set_data_fid);
+ if (sd) {
+ free(sd);
+ (*env)->SetLongField(env, obj, fd_set_data_fid, 0L);
+ }
+}
+
+JNIEXPORT jint JNICALL
+Java_org_apache_hadoop_net_unix_DomainSocketWatcher_doPoll0(
+JNIEnv *env, jclass clazz, jint checkMs, jobject fdSet)
+{
+ struct fd_set_data *sd;
+ int ret, err;
+
+ sd = (struct fd_set_data*)(intptr_t)(*env)->
+ GetLongField(env, fdSet, fd_set_data_fid);
+ ret = poll(sd->pollfd, sd->used_size, checkMs);
+ if (ret >= 0) {
+ return ret;
+ }
+ err = errno;
+ if (err != EINTR) { // treat EINTR as 0 fds ready
+ (*env)->Throw(env, newIOException(env,
+ "poll(2) failed with error code %d: %s", err, terror(err)));
+ }
+ return 0;
+}
Added: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/nativeio/TestSharedFileDescriptorFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/nativeio/TestSharedFileDescriptorFactory.java?rev=1563362&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/nativeio/TestSharedFileDescriptorFactory.java (added)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/nativeio/TestSharedFileDescriptorFactory.java Sat Feb 1 02:25:33 2014
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io.nativeio;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Test;
+import org.apache.commons.lang.SystemUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+
+public class TestSharedFileDescriptorFactory {
+ static final Log LOG = LogFactory.getLog(TestSharedFileDescriptorFactory.class);
+
+ private static final File TEST_BASE =
+ new File(System.getProperty("test.build.data", "/tmp"));
+
+ @Test(timeout=10000)
+ public void testReadAndWrite() throws Exception {
+ Assume.assumeTrue(NativeIO.isAvailable());
+ Assume.assumeTrue(SystemUtils.IS_OS_UNIX);
+ File path = new File(TEST_BASE, "testReadAndWrite");
+ path.mkdirs();
+ SharedFileDescriptorFactory factory =
+ new SharedFileDescriptorFactory("woot_", path.getAbsolutePath());
+ FileInputStream inStream = factory.createDescriptor(4096);
+ FileOutputStream outStream = new FileOutputStream(inStream.getFD());
+ outStream.write(101);
+ inStream.getChannel().position(0);
+ Assert.assertEquals(101, inStream.read());
+ inStream.close();
+ outStream.close();
+ FileUtil.fullyDelete(path);
+ }
+
+ static private void createTempFile(String path) throws Exception {
+ FileOutputStream fos = new FileOutputStream(path);
+ fos.write(101);
+ fos.close();
+ }
+
+ @Test(timeout=10000)
+ public void testCleanupRemainders() throws Exception {
+ Assume.assumeTrue(NativeIO.isAvailable());
+ Assume.assumeTrue(SystemUtils.IS_OS_UNIX);
+ File path = new File(TEST_BASE, "testCleanupRemainders");
+ path.mkdirs();
+ String remainder1 = path.getAbsolutePath() +
+ Path.SEPARATOR + "woot2_remainder1";
+ String remainder2 = path.getAbsolutePath() +
+ Path.SEPARATOR + "woot2_remainder2";
+ createTempFile(remainder1);
+ createTempFile(remainder2);
+ new SharedFileDescriptorFactory("woot2_", path.getAbsolutePath());
+ // creating the SharedFileDescriptorFactory should have removed
+ // the remainders
+ Assert.assertFalse(new File(remainder1).exists());
+ Assert.assertFalse(new File(remainder2).exists());
+ FileUtil.fullyDelete(path);
+ }
+}
Added: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/unix/TestDomainSocketWatcher.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/unix/TestDomainSocketWatcher.java?rev=1563362&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/unix/TestDomainSocketWatcher.java (added)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/unix/TestDomainSocketWatcher.java Sat Feb 1 02:25:33 2014
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.net.unix;
+
+import java.util.ArrayList;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+
+public class TestDomainSocketWatcher {
+ static final Log LOG = LogFactory.getLog(TestDomainSocketWatcher.class);
+
+ @Before
+ public void before() {
+ Assume.assumeTrue(DomainSocket.getLoadingFailureReason() == null);
+ }
+
+ /**
+ * Test that we can create a DomainSocketWatcher and then shut it down.
+ */
+ @Test(timeout=60000)
+ public void testCreateShutdown() throws Exception {
+ DomainSocketWatcher watcher = new DomainSocketWatcher(10000000);
+ watcher.close();
+ }
+
+ /**
+ * Test that we can get notifications out a DomainSocketWatcher.
+ */
+ @Test(timeout=180000)
+ public void testDeliverNotifications() throws Exception {
+ DomainSocketWatcher watcher = new DomainSocketWatcher(10000000);
+ DomainSocket pair[] = DomainSocket.socketpair();
+ final CountDownLatch latch = new CountDownLatch(1);
+ watcher.add(pair[1], new DomainSocketWatcher.Handler() {
+ @Override
+ public boolean handle(DomainSocket sock) {
+ latch.countDown();
+ return true;
+ }
+ });
+ pair[0].close();
+ latch.await();
+ watcher.close();
+ }
+
+ /**
+ * Test that a java interruption can stop the watcher thread
+ */
+ @Test(timeout=60000)
+ public void testInterruption() throws Exception {
+ DomainSocketWatcher watcher = new DomainSocketWatcher(10);
+ watcher.interrupt();
+ Uninterruptibles.joinUninterruptibly(watcher);
+ }
+
+ @Test(timeout=300000)
+ public void testStress() throws Exception {
+ final int SOCKET_NUM = 250;
+ final ReentrantLock lock = new ReentrantLock();
+ final DomainSocketWatcher watcher = new DomainSocketWatcher(10000000);
+ final ArrayList<DomainSocket[]> pairs = new ArrayList<DomainSocket[]>();
+ final AtomicInteger handled = new AtomicInteger(0);
+
+ final Thread adderThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ for (int i = 0; i < SOCKET_NUM; i++) {
+ DomainSocket pair[] = DomainSocket.socketpair();
+ watcher.add(pair[1], new DomainSocketWatcher.Handler() {
+ @Override
+ public boolean handle(DomainSocket sock) {
+ handled.incrementAndGet();
+ return true;
+ }
+ });
+ lock.lock();
+ try {
+ pairs.add(pair);
+ } finally {
+ lock.unlock();
+ }
+ }
+ } catch (Throwable e) {
+ LOG.error(e);
+ throw new RuntimeException(e);
+ }
+ }
+ });
+
+ final Thread removerThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ final Random random = new Random();
+ try {
+ while (handled.get() != SOCKET_NUM) {
+ lock.lock();
+ try {
+ if (!pairs.isEmpty()) {
+ int idx = random.nextInt(pairs.size());
+ DomainSocket pair[] = pairs.remove(idx);
+ if (random.nextBoolean()) {
+ pair[0].close();
+ } else {
+ watcher.remove(pair[1]);
+ }
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+ } catch (Throwable e) {
+ LOG.error(e);
+ throw new RuntimeException(e);
+ }
+ }
+ });
+
+ adderThread.start();
+ removerThread.start();
+ Uninterruptibles.joinUninterruptibly(adderThread);
+ Uninterruptibles.joinUninterruptibly(removerThread);
+ watcher.close();
+ }
+}