You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cm...@apache.org on 2014/02/01 03:25:34 UTC

svn commit: r1563362 - in /hadoop/common/trunk/hadoop-common-project/hadoop-common: ./ src/ src/main/java/org/apache/hadoop/io/nativeio/ src/main/java/org/apache/hadoop/net/unix/ src/main/java/org/apache/hadoop/util/ src/main/native/src/org/apache/hado...

Author: cmccabe
Date: Sat Feb  1 02:25:33 2014
New Revision: 1563362

URL: http://svn.apache.org/r1563362
Log:
HDFS-5746.  Add ShortCircuitSharedMemorySegment (cmccabe)

Added:
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/SharedFileDescriptorFactory.java
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocketWatcher.java
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/CloseableReferenceCount.java
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/SharedFileDescriptorFactory.c
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/net/unix/DomainSocketWatcher.c
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/nativeio/TestSharedFileDescriptorFactory.java
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/unix/TestDomainSocketWatcher.java
Modified:
    hadoop/common/trunk/hadoop-common-project/hadoop-common/pom.xml
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/CMakeLists.txt
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocket.java
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/NativeIO.c

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/pom.xml?rev=1563362&r1=1563361&r2=1563362&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/pom.xml (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/pom.xml Sat Feb  1 02:25:33 2014
@@ -543,6 +543,7 @@
                     <javahClassName>org.apache.hadoop.io.compress.bzip2.Bzip2Decompressor</javahClassName>
                     <javahClassName>org.apache.hadoop.security.JniBasedUnixGroupsMapping</javahClassName>
                     <javahClassName>org.apache.hadoop.io.nativeio.NativeIO</javahClassName>
+                    <javahClassName>org.apache.hadoop.io.nativeio.SharedFileDescriptorFactory</javahClassName>
                     <javahClassName>org.apache.hadoop.security.JniBasedUnixGroupsNetgroupMapping</javahClassName>
                     <javahClassName>org.apache.hadoop.io.compress.snappy.SnappyCompressor</javahClassName>
                     <javahClassName>org.apache.hadoop.io.compress.snappy.SnappyDecompressor</javahClassName>
@@ -550,6 +551,7 @@
                     <javahClassName>org.apache.hadoop.io.compress.lz4.Lz4Decompressor</javahClassName>
                     <javahClassName>org.apache.hadoop.util.NativeCrc32</javahClassName>
                     <javahClassName>org.apache.hadoop.net.unix.DomainSocket</javahClassName>
+                    <javahClassName>org.apache.hadoop.net.unix.DomainSocketWatcher</javahClassName>
                   </javahClassNames>
                   <javahOutputDirectory>${project.build.directory}/native/javah</javahOutputDirectory>
                 </configuration>

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/CMakeLists.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/CMakeLists.txt?rev=1563362&r1=1563361&r2=1563362&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/CMakeLists.txt (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/CMakeLists.txt Sat Feb  1 02:25:33 2014
@@ -178,7 +178,9 @@ add_dual_library(hadoop
     ${D}/io/nativeio/NativeIO.c
     ${D}/io/nativeio/errno_enum.c
     ${D}/io/nativeio/file_descriptor.c
+    ${D}/io/nativeio/SharedFileDescriptorFactory.c
     ${D}/net/unix/DomainSocket.c
+    ${D}/net/unix/DomainSocketWatcher.c
     ${D}/security/JniBasedUnixGroupsMapping.c
     ${D}/security/JniBasedUnixGroupsNetgroupMapping.c
     ${D}/security/hadoop_group_info.c

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java?rev=1563362&r1=1563361&r2=1563362&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java Sat Feb  1 02:25:33 2014
@@ -487,6 +487,16 @@ public class NativeIO {
       new ConcurrentHashMap<Integer, CachedName>();
 
     private enum IdCache { USER, GROUP }
+
+    public final static int MMAP_PROT_READ = 0x1; 
+    public final static int MMAP_PROT_WRITE = 0x2; 
+    public final static int MMAP_PROT_EXEC = 0x4; 
+
+    public static native long mmap(FileDescriptor fd, int prot,
+        boolean shared, long length) throws IOException;
+
+    public static native void munmap(long addr, long length)
+        throws IOException;
   }
 
   private static boolean workaroundNonThreadSafePasswdCalls = false;

Added: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/SharedFileDescriptorFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/SharedFileDescriptorFactory.java?rev=1563362&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/SharedFileDescriptorFactory.java (added)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/SharedFileDescriptorFactory.java Sat Feb  1 02:25:33 2014
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io.nativeio;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.FileDescriptor;
+
+import org.apache.commons.lang.SystemUtils;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * A factory for creating shared file descriptors inside a given directory.
+ * Typically, the directory will be /dev/shm or /tmp.
+ *
+ * We will hand out file descriptors that correspond to unlinked files residing
+ * in that directory.  These file descriptors are suitable for sharing across
+ * multiple processes and are both readable and writable.
+ *
+ * Because we unlink the temporary files right after creating them, a JVM crash
+ * usually does not leave behind any temporary files in the directory.  However,
+ * it may happen that we crash right after creating the file and before
+ * unlinking it.  In the constructor, we attempt to clean up after any such
+ * remnants by trying to unlink any temporary files created by previous
+ * SharedFileDescriptorFactory instances that also used our prefix.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class SharedFileDescriptorFactory {
+  private final String prefix;
+  private final String path;
+
+  /**
+   * Create a SharedFileDescriptorFactory.
+   *
+   * @param prefix    Prefix to add to all file names we use.
+   * @param path      Path to use.
+   */
+  public SharedFileDescriptorFactory(String prefix, String path)
+      throws IOException {
+    Preconditions.checkArgument(NativeIO.isAvailable());
+    Preconditions.checkArgument(SystemUtils.IS_OS_UNIX);
+    this.prefix = prefix;
+    this.path = path;
+    deleteStaleTemporaryFiles0(prefix, path);
+  }
+
+  /**
+   * Create a shared file descriptor which will be both readable and writable.
+   *
+   * @param length         The starting file length.
+   *
+   * @return               The file descriptor, wrapped in a FileInputStream.
+   * @throws IOException   If there was an I/O or configuration error creating
+   *                       the descriptor.
+   */
+  public FileInputStream createDescriptor(int length) throws IOException {
+    return new FileInputStream(createDescriptor0(prefix, path, length));
+  }
+
+  /**
+   * Delete temporary files in the directory, NOT following symlinks.
+   */
+  private static native void deleteStaleTemporaryFiles0(String prefix,
+      String path) throws IOException;
+
+  /**
+   * Create a file with O_EXCL, and then resize it to the desired size.
+   */
+  private static native FileDescriptor createDescriptor0(String prefix,
+      String path, int length) throws IOException;
+}

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocket.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocket.java?rev=1563362&r1=1563361&r2=1563362&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocket.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocket.java Sat Feb  1 02:25:33 2014
@@ -24,17 +24,15 @@ import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
-import java.net.SocketException;
-import java.nio.channels.AsynchronousCloseException;
 import java.nio.channels.ClosedChannelException;
 import java.nio.channels.ReadableByteChannel;
 import java.nio.ByteBuffer;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.lang.SystemUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.util.NativeCodeLoader;
+import org.apache.hadoop.util.CloseableReferenceCount;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -132,104 +130,14 @@ public class DomainSocket implements Clo
   }
 
   /**
-   * Tracks the reference count of the file descriptor, and also whether it is
-   * open or closed.
+   * The socket reference count and closed bit.
    */
-  private static class Status {
-    /**
-     * Bit mask representing a closed domain socket. 
-     */
-    private static final int STATUS_CLOSED_MASK = 1 << 30;
-    
-    /**
-     * Status bits
-     * 
-     * Bit 30: 0 = DomainSocket open, 1 = DomainSocket closed
-     * Bits 29 to 0: the reference count.
-     */
-    private final AtomicInteger bits = new AtomicInteger(0);
-
-    Status() { }
-
-    /**
-     * Increment the reference count of the underlying file descriptor.
-     *
-     * @throws ClosedChannelException      If the file descriptor is closed.
-     */
-    void reference() throws ClosedChannelException {
-      int curBits = bits.incrementAndGet();
-      if ((curBits & STATUS_CLOSED_MASK) != 0) {
-        bits.decrementAndGet();
-        throw new ClosedChannelException();
-      }
-    }
-
-    /**
-     * Decrement the reference count of the underlying file descriptor.
-     *
-     * @param checkClosed        Whether to throw an exception if the file
-     *                           descriptor is closed.
-     *
-     * @throws AsynchronousCloseException  If the file descriptor is closed and
-     *                                     checkClosed is set.
-     */
-    void unreference(boolean checkClosed) throws AsynchronousCloseException {
-      int newCount = bits.decrementAndGet();
-      assert (newCount & ~STATUS_CLOSED_MASK) >= 0;
-      if (checkClosed && ((newCount & STATUS_CLOSED_MASK) != 0)) {
-        throw new AsynchronousCloseException();
-      }
-    }
-
-    /**
-     * Return true if the file descriptor is currently open.
-     * 
-     * @return                 True if the file descriptor is currently open.
-     */
-    boolean isOpen() {
-      return ((bits.get() & STATUS_CLOSED_MASK) == 0);
-    }
-
-    /**
-     * Mark the file descriptor as closed.
-     *
-     * Once the file descriptor is closed, it cannot be reopened.
-     *
-     * @return                         The current reference count.
-     * @throws ClosedChannelException  If someone else closes the file 
-     *                                 descriptor before we do.
-     */
-    int setClosed() throws ClosedChannelException {
-      while (true) {
-        int curBits = bits.get();
-        if ((curBits & STATUS_CLOSED_MASK) != 0) {
-          throw new ClosedChannelException();
-        }
-        if (bits.compareAndSet(curBits, curBits | STATUS_CLOSED_MASK)) {
-          return curBits & (~STATUS_CLOSED_MASK);
-        }
-      }
-    }
-
-    /**
-     * Get the current reference count.
-     *
-     * @return                 The current reference count.
-     */
-    int getReferenceCount() {
-      return bits.get() & (~STATUS_CLOSED_MASK);
-    }
-  }
-
-  /**
-   * The socket status.
-   */
-  private final Status status;
+  final CloseableReferenceCount refCount;
 
   /**
    * The file descriptor associated with this UNIX domain socket.
    */
-  private final int fd;
+  final int fd;
 
   /**
    * The path associated with this UNIX domain socket.
@@ -252,13 +160,21 @@ public class DomainSocket implements Clo
   private final DomainChannel channel = new DomainChannel();
 
   private DomainSocket(String path, int fd) {
-    this.status = new Status();
+    this.refCount = new CloseableReferenceCount();
     this.fd = fd;
     this.path = path;
   }
 
   private static native int bind0(String path) throws IOException;
 
+  private void unreference(boolean checkClosed) throws ClosedChannelException {
+    if (checkClosed) {
+      refCount.unreferenceCheckClosed();
+    } else {
+      refCount.unreference();
+    }
+  }
+
   /**
    * Create a new DomainSocket listening on the given path.
    *
@@ -308,14 +224,14 @@ public class DomainSocket implements Clo
    * @throws SocketTimeoutException       If the accept timed out.
    */
   public DomainSocket accept() throws IOException {
-    status.reference();
+    refCount.reference();
     boolean exc = true;
     try {
       DomainSocket ret = new DomainSocket(path, accept0(fd));
       exc = false;
       return ret;
     } finally {
-      status.unreference(exc);
+      unreference(exc);
     }
   }
 
@@ -335,14 +251,14 @@ public class DomainSocket implements Clo
     return new DomainSocket(path, fd);
   }
 
- /**
-  * Return true if the file descriptor is currently open.
-  *
-  * @return                 True if the file descriptor is currently open.
-  */
- public boolean isOpen() {
-   return status.isOpen();
- }
+  /**
+   * Return true if the file descriptor is currently open.
+   *
+   * @return                 True if the file descriptor is currently open.
+   */
+  public boolean isOpen() {
+    return refCount.isOpen();
+  }
 
   /**
    * @return                 The socket path.
@@ -381,20 +297,20 @@ public class DomainSocket implements Clo
       throws IOException;
 
   public void setAttribute(int type, int size) throws IOException {
-    status.reference();
+    refCount.reference();
     boolean exc = true;
     try {
       setAttribute0(fd, type, size);
       exc = false;
     } finally {
-      status.unreference(exc);
+      unreference(exc);
     }
   }
 
   private native int getAttribute0(int fd, int type) throws IOException;
 
   public int getAttribute(int type) throws IOException {
-    status.reference();
+    refCount.reference();
     int attribute;
     boolean exc = true;
     try {
@@ -402,7 +318,7 @@ public class DomainSocket implements Clo
       exc = false;
       return attribute;
     } finally {
-      status.unreference(exc);
+      unreference(exc);
     }
   }
 
@@ -419,9 +335,9 @@ public class DomainSocket implements Clo
   @Override
   public void close() throws IOException {
     // Set the closed bit on this DomainSocket
-    int refCount;
+    int count;
     try {
-      refCount = status.setClosed();
+      count = refCount.setClosed();
     } catch (ClosedChannelException e) {
       // Someone else already closed the DomainSocket.
       return;
@@ -429,7 +345,7 @@ public class DomainSocket implements Clo
     // Wait for all references to go away
     boolean didShutdown = false;
     boolean interrupted = false;
-    while (refCount > 0) {
+    while (count > 0) {
       if (!didShutdown) {
         try {
           // Calling shutdown on the socket will interrupt blocking system
@@ -446,7 +362,7 @@ public class DomainSocket implements Clo
       } catch (InterruptedException e) {
         interrupted = true;
       }
-      refCount = status.getReferenceCount();
+      count = refCount.getReferenceCount();
     }
 
     // At this point, nobody has a reference to the file descriptor, 
@@ -478,13 +394,13 @@ public class DomainSocket implements Clo
    */
   public void sendFileDescriptors(FileDescriptor descriptors[],
       byte jbuf[], int offset, int length) throws IOException {
-    status.reference();
+    refCount.reference();
     boolean exc = true;
     try {
       sendFileDescriptors0(fd, descriptors, jbuf, offset, length);
       exc = false;
     } finally {
-      status.unreference(exc);
+      unreference(exc);
     }
   }
 
@@ -515,14 +431,14 @@ public class DomainSocket implements Clo
    */
   public int receiveFileDescriptors(FileDescriptor[] descriptors,
       byte jbuf[], int offset, int length) throws IOException {
-    status.reference();
+    refCount.reference();
     boolean exc = true;
     try {
       int nBytes = receiveFileDescriptors0(fd, descriptors, jbuf, offset, length);
       exc = false;
       return nBytes;
     } finally {
-      status.unreference(exc);
+      unreference(exc);
     }
   }
 
@@ -539,7 +455,7 @@ public class DomainSocket implements Clo
     for (int i = 0; i < streams.length; i++) {
       streams[i] = null;
     }
-    status.reference();
+    refCount.reference();
     try {
       int ret = receiveFileDescriptors0(fd, descriptors, buf, offset, length);
       for (int i = 0, j = 0; i < descriptors.length; i++) {
@@ -569,7 +485,7 @@ public class DomainSocket implements Clo
           }
         }
       }
-      status.unreference(!success);
+      unreference(!success);
     }
   }
 
@@ -593,7 +509,7 @@ public class DomainSocket implements Clo
   public class DomainInputStream extends InputStream {
     @Override
     public int read() throws IOException {
-      status.reference();
+      refCount.reference();
       boolean exc = true;
       try {
         byte b[] = new byte[1];
@@ -601,33 +517,33 @@ public class DomainSocket implements Clo
         exc = false;
         return (ret >= 0) ? b[0] : -1;
       } finally {
-        status.unreference(exc);
+        unreference(exc);
       }
     }
     
     @Override
     public int read(byte b[], int off, int len) throws IOException {
-      status.reference();
+      refCount.reference();
       boolean exc = true;
       try {
         int nRead = DomainSocket.readArray0(DomainSocket.this.fd, b, off, len);
         exc = false;
         return nRead;
       } finally {
-        status.unreference(exc);
+        unreference(exc);
       }
     }
 
     @Override
     public int available() throws IOException {
-      status.reference();
+      refCount.reference();
       boolean exc = true;
       try {
         int nAvailable = DomainSocket.available0(DomainSocket.this.fd);
         exc = false;
         return nAvailable;
       } finally {
-        status.unreference(exc);
+        unreference(exc);
       }
     }
 
@@ -649,7 +565,7 @@ public class DomainSocket implements Clo
 
     @Override
     public void write(int val) throws IOException {
-      status.reference();
+      refCount.reference();
       boolean exc = true;
       try {
         byte b[] = new byte[1];
@@ -657,19 +573,19 @@ public class DomainSocket implements Clo
         DomainSocket.writeArray0(DomainSocket.this.fd, b, 0, 1);
         exc = false;
       } finally {
-        status.unreference(exc);
+        unreference(exc);
       }
     }
 
     @Override
     public void write(byte[] b, int off, int len) throws IOException {
-      status.reference();
-        boolean exc = true;
+      refCount.reference();
+      boolean exc = true;
       try {
         DomainSocket.writeArray0(DomainSocket.this.fd, b, off, len);
         exc = false;
       } finally {
-        status.unreference(exc);
+        unreference(exc);
       }
     }
   }
@@ -688,7 +604,7 @@ public class DomainSocket implements Clo
 
     @Override
     public int read(ByteBuffer dst) throws IOException {
-      status.reference();
+      refCount.reference();
       boolean exc = true;
       try {
         int nread = 0;
@@ -710,7 +626,7 @@ public class DomainSocket implements Clo
         exc = false;
         return nread;
       } finally {
-        status.unreference(exc);
+        unreference(exc);
       }
     }
   }

Added: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocketWatcher.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocketWatcher.java?rev=1563362&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocketWatcher.java (added)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocketWatcher.java Sat Feb  1 02:25:33 2014
@@ -0,0 +1,478 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.net.unix;
+
+import java.io.Closeable;
+import java.io.EOFException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.io.IOUtils;
+
+import java.io.IOException;
+import java.nio.channels.ClosedChannelException;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.TreeMap;
+import java.util.Map;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.commons.lang.SystemUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.util.NativeCodeLoader;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.Uninterruptibles;
+
+/**
+ * The DomainSocketWatcher watches a set of domain sockets to see when they
+ * become readable, or closed.  When one of those events happens, it makes a
+ * callback.
+ *
+ * See {@link DomainSocket} for more information about UNIX domain sockets.
+ */
+@InterfaceAudience.LimitedPrivate("HDFS")
+public final class DomainSocketWatcher extends Thread implements Closeable {
+  static {
+    if (SystemUtils.IS_OS_WINDOWS) {
+      loadingFailureReason = "UNIX Domain sockets are not available on Windows.";
+    } else if (!NativeCodeLoader.isNativeCodeLoaded()) {
+      loadingFailureReason = "libhadoop cannot be loaded.";
+    } else {
+      String problem;
+      try {
+        anchorNative();
+        problem = null;
+      } catch (Throwable t) {
+        problem = "DomainSocketWatcher#anchorNative got error: " +
+          t.getMessage();
+      }
+      loadingFailureReason = problem;
+    }
+  }
+
+  static Log LOG = LogFactory.getLog(DomainSocketWatcher.class);
+
+  /**
+   * The reason why DomainSocketWatcher is not available, or null if it is
+   * available.
+   */
+  private final static String loadingFailureReason;
+
+  /**
+   * Initializes the native library code.
+   */
+  private static native void anchorNative();
+
+  interface Handler {
+    /**
+     * Handles an event on a socket.  An event may be the socket becoming
+     * readable, or the remote end being closed.
+     *
+     * @param sock    The socket that the event occurred on.
+     * @return        Whether we should close the socket.
+     */
+    boolean handle(DomainSocket sock);
+  }
+
+  /**
+   * Handler for {DomainSocketWatcher#notificationSockets[1]}
+   */
+  private class NotificationHandler implements Handler {
+    public boolean handle(DomainSocket sock) {
+      try {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace(this + ": NotificationHandler: doing a read on " +
+            sock.fd);
+        }
+        if (sock.getInputStream().read() == -1) {
+          if (LOG.isTraceEnabled()) {
+            LOG.trace(this + ": NotificationHandler: got EOF on " + sock.fd);
+          }
+          throw new EOFException();
+        }
+        if (LOG.isTraceEnabled()) {
+          LOG.trace(this + ": NotificationHandler: read succeeded on " +
+            sock.fd);
+        }
+        return false;
+      } catch (IOException e) {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace(this + ": NotificationHandler: setting closed to " +
+              "true for " + sock.fd);
+        }
+        closed = true;
+        return true;
+      }
+    }
+  }
+
+  private static class Entry {
+    final DomainSocket socket;
+    final Handler handler;
+
+    Entry(DomainSocket socket, Handler handler) {
+      this.socket = socket;
+      this.handler = handler;
+    }
+
+    DomainSocket getDomainSocket() {
+      return socket;
+    }
+
+    Handler getHandler() {
+      return handler;
+    }
+  }
+
+  /**
+   * The FdSet is a set of file descriptors that gets passed to poll(2).
+   * It contains a native memory segment, so that we don't have to copy
+   * in the poll0 function.
+   */
+  private static class FdSet {
+    private long data;
+
+    private native static long alloc0();
+
+    FdSet() {
+      data = alloc0();
+    }
+
+    /**
+     * Add a file descriptor to the set.
+     *
+     * @param fd   The file descriptor to add.
+     */
+    native void add(int fd);
+
+    /**
+     * Remove a file descriptor from the set.
+     *
+     * @param fd   The file descriptor to remove.
+     */
+    native void remove(int fd);
+
+    /**
+     * Get an array containing all the FDs marked as readable.
+     * Also clear the state of all FDs.
+     *
+     * @return     An array containing all of the currently readable file
+     *             descriptors.
+     */
+    native int[] getAndClearReadableFds();
+
+    /**
+     * Close the object and de-allocate the memory used.
+     */
+    native void close();
+  }
+
+  /**
+   * Lock which protects toAdd, toRemove, and closed.
+   */
+  private final ReentrantLock lock = new ReentrantLock();
+
+  /**
+   * Condition variable which indicates that toAdd and toRemove have been
+   * processed.
+   */
+  private final Condition processedCond = lock.newCondition();
+
+  /**
+   * Entries to add.
+   */
+  private final LinkedList<Entry> toAdd =
+      new LinkedList<Entry>();
+
+  /**
+   * Entries to remove.
+   */
+  private final TreeMap<Integer, DomainSocket> toRemove =
+      new TreeMap<Integer, DomainSocket>();
+
+  /**
+   * Maximum length of time to go between checking whether the interrupted
+   * bit has been set for this thread.
+   */
+  private final int interruptCheckPeriodMs;
+
+  /**
+   * A pair of sockets used to wake up the thread after it has called poll(2).
+   */
+  private final DomainSocket notificationSockets[];
+
+  /**
+   * Whether or not this DomainSocketWatcher is closed.
+   */
+  private boolean closed = false;
+
+  public DomainSocketWatcher(int interruptCheckPeriodMs) throws IOException {
+    if (loadingFailureReason != null) {
+      throw new UnsupportedOperationException(loadingFailureReason);
+    }
+    notificationSockets = DomainSocket.socketpair();
+    this.interruptCheckPeriodMs = interruptCheckPeriodMs;
+    Preconditions.checkArgument(interruptCheckPeriodMs > 0);
+    watcherThread.start();
+  }
+
+  /**
+   * Close the DomainSocketWatcher and wait for its thread to terminate.
+   *
+   * If there is more than one close, all but the first will be ignored.
+   */
+  @Override
+  public void close() throws IOException {
+    try {
+      lock.lock();
+      if (closed) return;
+      LOG.info(this + ": closing");
+      closed = true;
+    } finally {
+      lock.unlock();
+    }
+    // Close notificationSockets[0], so that notificationSockets[1] gets an EOF
+    // event.  This will wake up the thread immediately if it is blocked inside
+    // the select() system call.
+    notificationSockets[0].close();
+    // Wait for the select thread to terminate.
+    Uninterruptibles.joinUninterruptibly(watcherThread);
+  }
+
+  /**
+   * Add a socket.
+   *
+   * @param sock     The socket to add.  It is an error to re-add a socket that
+   *                   we are already watching.
+   * @param handler  The handler to associate with this socket.  This may be
+   *                   called any time after this function is called.
+   */
+  public void add(DomainSocket sock, Handler handler) {
+    try {
+      lock.lock();
+      checkNotClosed();
+      Entry entry = new Entry(sock, handler);
+      try {
+        sock.refCount.reference();
+      } catch (ClosedChannelException e) {
+        Preconditions.checkArgument(false,
+            "tried to add a closed DomainSocket to " + this);
+      }
+      toAdd.add(entry);
+      kick();
+      while (true) {
+        try {
+          processedCond.await();
+        } catch (InterruptedException e) {
+          this.interrupt();
+        }
+        if (!toAdd.contains(entry)) {
+          break;
+        }
+        checkNotClosed();
+      }
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * Remove a socket.  Its handler will be called.
+   *
+   * @param sock     The socket to remove.
+   */
+  public void remove(DomainSocket sock) {
+    try {
+      lock.lock();
+      checkNotClosed();
+      toRemove.put(sock.fd, sock);
+      kick();
+      while (true) {
+        try {
+          processedCond.await();
+        } catch (InterruptedException e) {
+          this.interrupt();
+        }
+        if (!toRemove.containsKey(sock.fd)) {
+          break;
+        }
+        checkNotClosed();
+      }
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * Wake up the DomainSocketWatcher thread.
+   */
+  private void kick() {
+    try {
+      notificationSockets[0].getOutputStream().write(0);
+    } catch (IOException e) {
+      LOG.error(this + ": error writing to notificationSockets[0]", e);
+    }
+  }
+
+  /**
+   * Check that the DomainSocketWatcher is not closed.
+   * Must be called while holding the lock.
+   */
+  private void checkNotClosed() {
+    Preconditions.checkState(lock.isHeldByCurrentThread());
+    if (closed) {
+      throw new RuntimeException("DomainSocketWatcher is closed.");
+    }
+  }
+
+  private void sendCallback(String caller, TreeMap<Integer, Entry> entries,
+      FdSet fdSet, int fd) {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace(this + ": " + caller + " starting sendCallback for fd " + fd);
+    }
+    Entry entry = entries.get(fd);
+    Preconditions.checkNotNull(entry,
+        this + ": fdSet contained " + fd + ", which we were " +
+        "not tracking.");
+    DomainSocket sock = entry.getDomainSocket();
+    if (entry.getHandler().handle(sock)) {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace(this + ": " + caller + ": closing fd " + fd +
+            " at the request of the handler.");
+      }
+      if (toRemove.remove(fd) != null) {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace(this + ": " + caller + " : sendCallback processed fd " +
+            fd  + " in toRemove.");
+        }
+      }
+      try {
+        sock.refCount.unreferenceCheckClosed();
+      } catch (IOException e) {
+        Preconditions.checkArgument(false,
+            this + ": file descriptor " + sock.fd + " was closed while " +
+            "still in the poll(2) loop.");
+      }
+      IOUtils.cleanup(LOG, sock);
+      entries.remove(fd);
+      fdSet.remove(fd);
+    } else {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace(this + ": " + caller + ": sendCallback not " +
+            "closing fd " + fd);
+      }
+    }
+  }
+
+  private final Thread watcherThread = new Thread(new Runnable() {
+    @Override
+    public void run() {
+      LOG.info(this + ": starting with interruptCheckPeriodMs = " +
+          interruptCheckPeriodMs);
+      final TreeMap<Integer, Entry> entries = new TreeMap<Integer, Entry>();
+      FdSet fdSet = new FdSet();
+      addNotificationSocket(entries, fdSet);
+      try {
+        while (true) {
+          lock.lock();
+          try {
+            for (int fd : fdSet.getAndClearReadableFds()) {
+              sendCallback("getAndClearReadableFds", entries, fdSet, fd);
+            }
+            if (!(toAdd.isEmpty() && toRemove.isEmpty())) {
+              // Handle pending additions (before pending removes).
+              for (Iterator<Entry> iter = toAdd.iterator(); iter.hasNext(); ) {
+                Entry entry = iter.next();
+                DomainSocket sock = entry.getDomainSocket();
+                Entry prevEntry = entries.put(sock.fd, entry);
+                Preconditions.checkState(prevEntry == null,
+                    this + ": tried to watch a file descriptor that we " +
+                    "were already watching: " + sock);
+                if (LOG.isTraceEnabled()) {
+                  LOG.trace(this + ": adding fd " + sock.fd);
+                }
+                fdSet.add(sock.fd);
+                iter.remove();
+              }
+              // Handle pending removals
+              while (true) {
+                Map.Entry<Integer, DomainSocket> entry = toRemove.firstEntry();
+                if (entry == null) break;
+                sendCallback("handlePendingRemovals",
+                    entries, fdSet, entry.getValue().fd);
+              }
+              processedCond.signalAll();
+            }
+            // Check if the thread should terminate.  Doing this check now is
+            // easier than at the beginning of the loop, since we know toAdd and
+            // toRemove are now empty and processedCond has been notified if it
+            // needed to be.
+            if (closed) {
+              LOG.info(toString() + " thread terminating.");
+              return;
+            }
+            // Check if someone sent our thread an InterruptedException while we
+            // were waiting in poll().
+            if (Thread.interrupted()) {
+              throw new InterruptedException();
+            }
+          } finally {
+            lock.unlock();
+          }
+          doPoll0(interruptCheckPeriodMs, fdSet);
+        }
+      } catch (InterruptedException e) {
+        LOG.info(toString() + " terminating on InterruptedException");
+      } catch (IOException e) {
+        LOG.error(toString() + " terminating on IOException", e);
+      } finally {
+        for (Entry entry : entries.values()) {
+          sendCallback("close", entries, fdSet, entry.getDomainSocket().fd);
+        }
+        entries.clear();
+        fdSet.close();
+      }
+    }
+  });
+
+  private void addNotificationSocket(final TreeMap<Integer, Entry> entries,
+      FdSet fdSet) {
+    entries.put(notificationSockets[1].fd, 
+        new Entry(notificationSockets[1], new NotificationHandler()));
+    try {
+      notificationSockets[1].refCount.reference();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    fdSet.add(notificationSockets[1].fd);
+    if (LOG.isTraceEnabled()) {
+      LOG.trace(this + ": adding notificationSocket " +
+          notificationSockets[1].fd + ", connected to " +
+          notificationSockets[0].fd);
+    }
+  }
+
+  public String toString() {
+    return "DomainSocketWatcher(" + System.identityHashCode(this) + ")"; 
+  }
+
+  private static native int doPoll0(int maxWaitMs, FdSet readFds)
+      throws IOException;
+}

Added: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/CloseableReferenceCount.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/CloseableReferenceCount.java?rev=1563362&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/CloseableReferenceCount.java (added)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/CloseableReferenceCount.java Sat Feb  1 02:25:33 2014
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.util;
+
+import java.nio.channels.AsynchronousCloseException;
+import java.nio.channels.ClosedChannelException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * A closeable object that maintains a reference count.
+ *
+ * Once the object is closed, attempting to take a new reference will throw
+ * ClosedChannelException.
+ */
+public class CloseableReferenceCount {
+  /**
+   * Bit mask representing a closed domain socket.
+   */
+  private static final int STATUS_CLOSED_MASK = 1 << 30;
+
+  /**
+   * The status bits.
+   *
+   * Bit 30: 0 = open, 1 = closed.
+   * Bits 29 to 0: the reference count.
+   */
+  private final AtomicInteger status = new AtomicInteger(0);
+
+  public CloseableReferenceCount() { }
+
+  /**
+   * Increment the reference count.
+   *
+   * @throws ClosedChannelException      If the status is closed.
+   */
+  public void reference() throws ClosedChannelException {
+    int curBits = status.incrementAndGet();
+    if ((curBits & STATUS_CLOSED_MASK) != 0) {
+      status.decrementAndGet();
+      throw new ClosedChannelException();
+    }
+  }
+
+  /**
+   * Decrement the reference count.
+   *
+   * @return          True if the object is closed and has no outstanding
+   *                  references.
+   */
+  public boolean unreference() {
+    int newVal = status.decrementAndGet();
+    Preconditions.checkState(newVal != 0xffffffff,
+        "called unreference when the reference count was already at 0.");
+    return newVal == STATUS_CLOSED_MASK;
+  }
+
+  /**
+   * Decrement the reference count, checking to make sure that the
+   * CloseableReferenceCount is not closed.
+   *
+   * @throws AsynchronousCloseException  If the status is closed.
+   */
+  public void unreferenceCheckClosed() throws ClosedChannelException {
+    int newVal = status.decrementAndGet();
+    if ((newVal & STATUS_CLOSED_MASK) != 0) {
+      throw new AsynchronousCloseException();
+    }
+  }
+
+  /**
+   * Return true if the status is currently open.
+   *
+   * @return                 True if the status is currently open.
+   */
+  public boolean isOpen() {
+    return ((status.get() & STATUS_CLOSED_MASK) == 0);
+  }
+
+  /**
+   * Mark the status as closed.
+   *
+   * Once the status is closed, it cannot be reopened.
+   *
+   * @return                         The current reference count.
+   * @throws ClosedChannelException  If someone else closes the object
+   *                                 before we do.
+   */
+  public int setClosed() throws ClosedChannelException {
+    while (true) {
+      int curBits = status.get();
+      if ((curBits & STATUS_CLOSED_MASK) != 0) {
+        throw new ClosedChannelException();
+      }
+      if (status.compareAndSet(curBits, curBits | STATUS_CLOSED_MASK)) {
+        return curBits & (~STATUS_CLOSED_MASK);
+      }
+    }
+  }
+
+  /**
+   * Get the current reference count.
+   *
+   * @return                 The current reference count.
+   */
+  public int getReferenceCount() {
+    return status.get() & (~STATUS_CLOSED_MASK);
+  }
+}

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/NativeIO.c
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/NativeIO.c?rev=1563362&r1=1563361&r2=1563362&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/NativeIO.c (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/NativeIO.c Sat Feb  1 02:25:33 2014
@@ -18,6 +18,7 @@
 
 #include "org_apache_hadoop.h"
 #include "org_apache_hadoop_io_nativeio_NativeIO.h"
+#include "org_apache_hadoop_io_nativeio_NativeIO_POSIX.h"
 
 #ifdef UNIX
 #include <assert.h>
@@ -49,6 +50,10 @@
 #include "file_descriptor.h"
 #include "errno_enum.h"
 
+#define MMAP_PROT_READ org_apache_hadoop_io_nativeio_NativeIO_POSIX_MMAP_PROT_READ
+#define MMAP_PROT_WRITE org_apache_hadoop_io_nativeio_NativeIO_POSIX_MMAP_PROT_WRITE
+#define MMAP_PROT_EXEC org_apache_hadoop_io_nativeio_NativeIO_POSIX_MMAP_PROT_EXEC
+
 // the NativeIO$POSIX$Stat inner class and its constructor
 static jclass stat_clazz;
 static jmethodID stat_ctor;
@@ -661,6 +666,39 @@ cleanup:
 #endif
 }
 
+JNIEXPORT jlong JNICALL 
+Java_org_apache_hadoop_io_nativeio_NativeIO_00024POSIX_mmap(
+  JNIEnv *env, jclass clazz, jobject jfd, jint jprot,
+  jboolean jshared, jlong length)
+{
+  void *addr = 0;
+  int prot, flags, fd;
+  
+  prot = ((jprot & MMAP_PROT_READ) ? PROT_READ : 0) |
+         ((jprot & MMAP_PROT_WRITE) ? PROT_WRITE : 0) |
+         ((jprot & MMAP_PROT_EXEC) ? PROT_EXEC : 0);
+  flags = (jshared == JNI_TRUE) ? MAP_SHARED : MAP_PRIVATE;
+  fd = fd_get(env, jfd);
+  addr = mmap(NULL, length, prot, flags, fd, 0);
+  if (addr == MAP_FAILED) {
+    throw_ioe(env, errno);
+  }
+  return (jlong)(intptr_t)addr;
+}
+
+JNIEXPORT void JNICALL 
+Java_org_apache_hadoop_io_nativeio_NativeIO_00024POSIX_munmap(
+  JNIEnv *env, jclass clazz, jlong jaddr, jlong length)
+{
+  void *addr;
+
+  addr = (void*)(intptr_t)jaddr;
+  if (munmap(addr, length) < 0) {
+    throw_ioe(env, errno);
+  }
+}
+
+
 /*
  * static native String getGroupName(int gid);
  *

Added: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/SharedFileDescriptorFactory.c
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/SharedFileDescriptorFactory.c?rev=1563362&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/SharedFileDescriptorFactory.c (added)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/SharedFileDescriptorFactory.c Sat Feb  1 02:25:33 2014
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "org_apache_hadoop.h"
+
+#ifdef UNIX
+
+#include "exception.h"
+#include "file_descriptor.h"
+#include "org_apache_hadoop.h"
+#include "org_apache_hadoop_io_nativeio_SharedFileDescriptorFactory.h"
+
+#include <dirent.h>
+#include <errno.h>
+#include <fcntl.h>
+#include <limits.h>
+#include <pthread.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/stat.h>
+#include <sys/types.h>
+#include <unistd.h>
+
+static pthread_mutex_t g_rand_lock = PTHREAD_MUTEX_INITIALIZER;
+
+JNIEXPORT void JNICALL
+Java_org_apache_hadoop_io_nativeio_SharedFileDescriptorFactory_deleteStaleTemporaryFiles0(
+  JNIEnv *env, jclass clazz, jstring jprefix, jstring jpath)
+{
+  const char *prefix = NULL, *path = NULL;
+  char target[PATH_MAX];
+  jthrowable jthr;
+  DIR *dp = NULL;
+  struct dirent *de;
+
+  prefix = (*env)->GetStringUTFChars(env, jprefix, NULL);
+  if (!prefix) goto done; // exception raised
+  path = (*env)->GetStringUTFChars(env, jpath, NULL);
+  if (!path) goto done; // exception raised
+
+  dp = opendir(path);
+  if (!dp) {
+    int ret = errno;
+    jthr = newIOException(env, "opendir(%s) error %d: %s",
+                          path, ret, terror(ret));
+    (*env)->Throw(env, jthr);
+    goto done;
+  }
+  while ((de = readdir(dp))) {
+    if (strncmp(prefix, de->d_name, strlen(prefix)) == 0) {
+      int ret = snprintf(target, PATH_MAX, "%s/%s", path, de->d_name);
+      if ((0 < ret) && (ret < PATH_MAX)) {
+        unlink(target);
+      }
+    }
+  }
+
+done:
+  if (dp) {
+    closedir(dp);
+  }
+  if (prefix) {
+    (*env)->ReleaseStringUTFChars(env, jprefix, prefix);
+  }
+  if (path) {
+    (*env)->ReleaseStringUTFChars(env, jpath, path);
+  }
+}
+
+JNIEXPORT jobject JNICALL
+Java_org_apache_hadoop_io_nativeio_SharedFileDescriptorFactory_createDescriptor0(
+  JNIEnv *env, jclass clazz, jstring jprefix, jstring jpath, jint length)
+{
+  const char *prefix = NULL, *path = NULL;
+  char target[PATH_MAX];
+  int ret, fd = -1, rnd;
+  jthrowable jthr;
+  jobject jret = NULL;
+
+  prefix = (*env)->GetStringUTFChars(env, jprefix, NULL);
+  if (!prefix) goto done; // exception raised
+  path = (*env)->GetStringUTFChars(env, jpath, NULL);
+  if (!path) goto done; // exception raised
+
+  pthread_mutex_lock(&g_rand_lock);
+  rnd = rand();
+  pthread_mutex_unlock(&g_rand_lock);
+  while (1) {
+    ret = snprintf(target, PATH_MAX, "%s/%s_%d",
+                   path, prefix, rnd);
+    if (ret < 0) {
+      jthr = newIOException(env, "snprintf error");
+      (*env)->Throw(env, jthr);
+      goto done;
+    } else if (ret >= PATH_MAX) {
+      jthr = newIOException(env, "computed path was too long.");
+      (*env)->Throw(env, jthr);
+      goto done;
+    }
+    fd = open(target, O_CREAT | O_EXCL | O_RDWR, 0700);
+    if (fd >= 0) break; // success
+    ret = errno;
+    if (ret == EEXIST) {
+      // Bad luck -- we got a very rare collision here between us and 
+      // another DataNode (or process).  Try again.
+      continue;
+    } else if (ret == EINTR) {
+      // Most of the time, this error is only possible when opening FIFOs.
+      // But let's be thorough.
+      continue;
+    }
+    jthr = newIOException(env, "open(%s, O_CREAT | O_EXCL | O_RDWR) "
+            "failed: error %d (%s)", target, ret, terror(ret));
+    (*env)->Throw(env, jthr);
+    goto done;
+  }
+  if (unlink(target) < 0) {
+    jthr = newIOException(env, "unlink(%s) failed: error %d (%s)",
+                          path, ret, terror(ret));
+    (*env)->Throw(env, jthr);
+    goto done;
+  }
+  if (ftruncate(fd, length) < 0) {
+    jthr = newIOException(env, "ftruncate(%s, %d) failed: error %d (%s)",
+                          path, length, ret, terror(ret));
+    (*env)->Throw(env, jthr);
+    goto done;
+  }
+  jret = fd_create(env, fd); // throws exception on error.
+
+done:
+  if (prefix) {
+    (*env)->ReleaseStringUTFChars(env, jprefix, prefix);
+  }
+  if (path) {
+    (*env)->ReleaseStringUTFChars(env, jpath, path);
+  }
+  if (!jret) {
+    if (fd >= 0) {
+      close(fd);
+    }
+  }
+  return jret;
+}
+
+#endif

Added: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/net/unix/DomainSocketWatcher.c
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/net/unix/DomainSocketWatcher.c?rev=1563362&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/net/unix/DomainSocketWatcher.c (added)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/net/unix/DomainSocketWatcher.c Sat Feb  1 02:25:33 2014
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "config.h"
+#include "exception.h"
+#include "org_apache_hadoop.h"
+#include "org_apache_hadoop_net_unix_DomainSocketWatcher.h"
+
+#include <errno.h>
+#include <fcntl.h>
+#include <jni.h>
+#include <poll.h>
+#include <stdint.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/select.h>
+#include <sys/time.h>
+#include <sys/types.h>
+#include <unistd.h>
+
+static jfieldID fd_set_data_fid;
+
+#define FD_SET_DATA_MIN_SIZE 2
+
+struct fd_set_data {
+  /**
+   * Number of fds we have allocated space for.
+   */
+  int alloc_size;
+
+  /**
+   * Number of fds actually in use.
+   */
+  int used_size;
+
+  /**
+   * Beginning of pollfd data.
+   */
+  struct pollfd pollfd[0];
+};
+
+JNIEXPORT void JNICALL
+Java_org_apache_hadoop_net_unix_DomainSocketWatcher_anchorNative(
+JNIEnv *env, jclass clazz)
+{
+  jclass fd_set_class;
+
+  fd_set_class = (*env)->FindClass(env,
+          "org/apache/hadoop/net/unix/DomainSocketWatcher$FdSet");
+  if (!fd_set_class) return; // exception raised
+  fd_set_data_fid = (*env)->GetFieldID(env, fd_set_class, "data", "J");
+  if (!fd_set_data_fid) return; // exception raised
+}
+
+JNIEXPORT jlong JNICALL
+Java_org_apache_hadoop_net_unix_DomainSocketWatcher_00024FdSet_alloc0(
+JNIEnv *env, jclass clazz)
+{
+  struct fd_set_data *sd;
+
+  sd = calloc(1, sizeof(struct fd_set_data) +
+              (sizeof(struct pollfd) * FD_SET_DATA_MIN_SIZE));
+  if (!sd) {
+    (*env)->Throw(env, newRuntimeException(env, "out of memory allocating "
+                                            "DomainSocketWatcher#FdSet"));
+    return 0L;
+  }
+  sd->alloc_size = FD_SET_DATA_MIN_SIZE;
+  sd->used_size = 0;
+  return (jlong)(intptr_t)sd;
+}
+
+JNIEXPORT void JNICALL
+Java_org_apache_hadoop_net_unix_DomainSocketWatcher_00024FdSet_add(
+JNIEnv *env, jobject obj, jint fd)
+{
+  struct fd_set_data *sd, *nd;
+  struct pollfd *pollfd;
+
+  sd = (struct fd_set_data*)(intptr_t)(*env)->
+    GetLongField(env, obj, fd_set_data_fid);
+  if (sd->used_size + 1 > sd->alloc_size) {
+    nd = realloc(sd, sizeof(struct fd_set_data) +
+            (sizeof(struct pollfd) * sd->alloc_size * 2));
+    if (!nd) {
+      (*env)->Throw(env, newRuntimeException(env, "out of memory adding "
+            "another fd to DomainSocketWatcher#FdSet.  we have %d already",
+            sd->alloc_size));
+      return;
+    }
+    nd->alloc_size = nd->alloc_size * 2;
+    (*env)->SetLongField(env, obj, fd_set_data_fid, (jlong)(intptr_t)nd);
+    sd = nd;
+  }
+  pollfd = &sd->pollfd[sd->used_size];
+  sd->used_size++;
+  pollfd->fd = fd;
+  pollfd->events = POLLIN;
+  pollfd->revents = 0;
+}
+
+JNIEXPORT void JNICALL
+Java_org_apache_hadoop_net_unix_DomainSocketWatcher_00024FdSet_remove(
+JNIEnv *env, jobject obj, jint fd)
+{
+  struct fd_set_data *sd;
+  struct pollfd *pollfd, *last_pollfd;
+  int used_size, i;
+
+  sd = (struct fd_set_data*)(intptr_t)(*env)->
+      GetLongField(env, obj, fd_set_data_fid);
+  used_size = sd->used_size;
+  for (i = 0; i < used_size; i++) {
+    pollfd = sd->pollfd + i;
+    if (pollfd->fd == fd) break;
+  }
+  if (i == used_size) {
+    (*env)->Throw(env, newRuntimeException(env, "failed to remove fd %d "
+          "from the FdSet because it was never present.", fd));
+    return;
+  }
+  last_pollfd = sd->pollfd + (used_size - 1);
+  if (used_size > 1) {
+    // Move last pollfd to the new empty slot if needed
+    pollfd->fd = last_pollfd->fd;
+    pollfd->events = last_pollfd->events;
+    pollfd->revents = last_pollfd->revents;
+  }
+  memset(last_pollfd, 0, sizeof(struct pollfd));
+  sd->used_size--;
+}
+
+JNIEXPORT jobject JNICALL
+Java_org_apache_hadoop_net_unix_DomainSocketWatcher_00024FdSet_getAndClearReadableFds(
+JNIEnv *env, jobject obj)
+{
+  int *carr = NULL;
+  jobject jarr = NULL;
+  struct fd_set_data *sd;
+  int used_size, num_readable = 0, i, j;
+  jthrowable jthr = NULL;
+
+  sd = (struct fd_set_data*)(intptr_t)(*env)->
+      GetLongField(env, obj, fd_set_data_fid);
+  used_size = sd->used_size;
+  for (i = 0; i < used_size; i++) {
+    if (sd->pollfd[i].revents & POLLIN) {
+      num_readable++;
+    } else {
+      sd->pollfd[i].revents = 0;
+    }
+  }
+  if (num_readable > 0) {
+    carr = malloc(sizeof(int) * num_readable);
+    if (!carr) {
+      jthr = newRuntimeException(env, "failed to allocate a temporary array "
+            "of %d ints", num_readable);
+      goto done;
+    }
+    j = 0;
+    for (i = 0; ((i < used_size) && (j < num_readable)); i++) {
+      if (sd->pollfd[i].revents & POLLIN) {
+        carr[j] = sd->pollfd[i].fd;
+        j++;
+        sd->pollfd[i].revents = 0;
+      }
+    }
+    if (j != num_readable) {
+      jthr = newRuntimeException(env, "failed to fill entire carr "
+            "array of size %d: only filled %d elements", num_readable, j);
+      goto done;
+    }
+  }
+  jarr = (*env)->NewIntArray(env, num_readable);
+  if (!jarr) {
+    jthr = (*env)->ExceptionOccurred(env);
+    (*env)->ExceptionClear(env);
+    goto done;
+  }
+  if (num_readable > 0) {
+    (*env)->SetIntArrayRegion(env, jarr, 0, num_readable, carr);
+    jthr = (*env)->ExceptionOccurred(env);
+    if (jthr) {
+      (*env)->ExceptionClear(env);
+      goto done;
+    }
+  }
+
+done:
+  free(carr);
+  if (jthr) {
+    (*env)->DeleteLocalRef(env, jarr);
+    jarr = NULL;
+  }
+  return jarr;
+}
+
+JNIEXPORT void JNICALL
+Java_org_apache_hadoop_net_unix_DomainSocketWatcher_00024FdSet_close(
+JNIEnv *env, jobject obj)
+{
+  struct fd_set_data *sd;
+
+  sd = (struct fd_set_data*)(intptr_t)(*env)->
+      GetLongField(env, obj, fd_set_data_fid);
+  if (sd) {
+    free(sd);
+    (*env)->SetLongField(env, obj, fd_set_data_fid, 0L);
+  }
+}
+
+JNIEXPORT jint JNICALL
+Java_org_apache_hadoop_net_unix_DomainSocketWatcher_doPoll0(
+JNIEnv *env, jclass clazz, jint checkMs, jobject fdSet)
+{
+  struct fd_set_data *sd;
+  int ret, err;
+
+  sd = (struct fd_set_data*)(intptr_t)(*env)->
+      GetLongField(env, fdSet, fd_set_data_fid);
+  ret = poll(sd->pollfd, sd->used_size, checkMs);
+  if (ret >= 0) {
+    return ret;
+  }
+  err = errno;
+  if (err != EINTR) { // treat EINTR as 0 fds ready
+    (*env)->Throw(env, newIOException(env,
+            "poll(2) failed with error code %d: %s", err, terror(err)));
+  }
+  return 0;
+}

Added: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/nativeio/TestSharedFileDescriptorFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/nativeio/TestSharedFileDescriptorFactory.java?rev=1563362&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/nativeio/TestSharedFileDescriptorFactory.java (added)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/nativeio/TestSharedFileDescriptorFactory.java Sat Feb  1 02:25:33 2014
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io.nativeio;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Test;
+import org.apache.commons.lang.SystemUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+
+public class TestSharedFileDescriptorFactory {
+  static final Log LOG = LogFactory.getLog(TestSharedFileDescriptorFactory.class);
+
+  private static final File TEST_BASE =
+      new File(System.getProperty("test.build.data", "/tmp"));
+
+  @Test(timeout=10000)
+  public void testReadAndWrite() throws Exception {
+    Assume.assumeTrue(NativeIO.isAvailable());
+    Assume.assumeTrue(SystemUtils.IS_OS_UNIX);
+    File path = new File(TEST_BASE, "testReadAndWrite");
+    path.mkdirs();
+    SharedFileDescriptorFactory factory =
+        new SharedFileDescriptorFactory("woot_", path.getAbsolutePath());
+    FileInputStream inStream = factory.createDescriptor(4096);
+    FileOutputStream outStream = new FileOutputStream(inStream.getFD());
+    outStream.write(101);
+    inStream.getChannel().position(0);
+    Assert.assertEquals(101, inStream.read());
+    inStream.close();
+    outStream.close();
+    FileUtil.fullyDelete(path);
+  }
+
+  static private void createTempFile(String path) throws Exception {
+    FileOutputStream fos = new FileOutputStream(path);
+    fos.write(101);
+    fos.close();
+  }
+  
+  @Test(timeout=10000)
+  public void testCleanupRemainders() throws Exception {
+    Assume.assumeTrue(NativeIO.isAvailable());
+    Assume.assumeTrue(SystemUtils.IS_OS_UNIX);
+    File path = new File(TEST_BASE, "testCleanupRemainders");
+    path.mkdirs();
+    String remainder1 = path.getAbsolutePath() + 
+        Path.SEPARATOR + "woot2_remainder1";
+    String remainder2 = path.getAbsolutePath() +
+        Path.SEPARATOR + "woot2_remainder2";
+    createTempFile(remainder1);
+    createTempFile(remainder2);
+    new SharedFileDescriptorFactory("woot2_", path.getAbsolutePath());
+    // creating the SharedFileDescriptorFactory should have removed 
+    // the remainders
+    Assert.assertFalse(new File(remainder1).exists());
+    Assert.assertFalse(new File(remainder2).exists());
+    FileUtil.fullyDelete(path);
+  }
+}

Added: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/unix/TestDomainSocketWatcher.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/unix/TestDomainSocketWatcher.java?rev=1563362&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/unix/TestDomainSocketWatcher.java (added)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/unix/TestDomainSocketWatcher.java Sat Feb  1 02:25:33 2014
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.net.unix;
+
+import java.util.ArrayList;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+
+public class TestDomainSocketWatcher {
+  static final Log LOG = LogFactory.getLog(TestDomainSocketWatcher.class);
+
+  @Before
+  public void before() {
+    Assume.assumeTrue(DomainSocket.getLoadingFailureReason() == null);
+  }
+
+  /**
+   * Test that we can create a DomainSocketWatcher and then shut it down.
+   */
+  @Test(timeout=60000)
+  public void testCreateShutdown() throws Exception {
+    DomainSocketWatcher watcher = new DomainSocketWatcher(10000000);
+    watcher.close();
+  }
+
+  /**
+   * Test that we can get notifications out a DomainSocketWatcher.
+   */
+  @Test(timeout=180000)
+  public void testDeliverNotifications() throws Exception {
+    DomainSocketWatcher watcher = new DomainSocketWatcher(10000000);
+    DomainSocket pair[] = DomainSocket.socketpair();
+    final CountDownLatch latch = new CountDownLatch(1);
+    watcher.add(pair[1], new DomainSocketWatcher.Handler() {
+      @Override
+      public boolean handle(DomainSocket sock) {
+        latch.countDown();
+        return true;
+      }
+    });
+    pair[0].close();
+    latch.await();
+    watcher.close();
+  }
+
+  /**
+   * Test that a java interruption can stop the watcher thread
+   */
+  @Test(timeout=60000)
+  public void testInterruption() throws Exception {
+    DomainSocketWatcher watcher = new DomainSocketWatcher(10);
+    watcher.interrupt();
+    Uninterruptibles.joinUninterruptibly(watcher);
+  }
+  
+  @Test(timeout=300000)
+  public void testStress() throws Exception {
+    final int SOCKET_NUM = 250;
+    final ReentrantLock lock = new ReentrantLock();
+    final DomainSocketWatcher watcher = new DomainSocketWatcher(10000000);
+    final ArrayList<DomainSocket[]> pairs = new ArrayList<DomainSocket[]>();
+    final AtomicInteger handled = new AtomicInteger(0);
+
+    final Thread adderThread = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          for (int i = 0; i < SOCKET_NUM; i++) {
+            DomainSocket pair[] = DomainSocket.socketpair();
+            watcher.add(pair[1], new DomainSocketWatcher.Handler() {
+              @Override
+              public boolean handle(DomainSocket sock) {
+                handled.incrementAndGet();
+                return true;
+              }
+            });
+            lock.lock();
+            try {
+              pairs.add(pair);
+            } finally {
+              lock.unlock();
+            }
+          }
+        } catch (Throwable e) {
+          LOG.error(e);
+          throw new RuntimeException(e);
+        }
+      }
+    });
+    
+    final Thread removerThread = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        final Random random = new Random();
+        try {
+          while (handled.get() != SOCKET_NUM) {
+            lock.lock();
+            try {
+              if (!pairs.isEmpty()) {
+                int idx = random.nextInt(pairs.size());
+                DomainSocket pair[] = pairs.remove(idx);
+                if (random.nextBoolean()) {
+                  pair[0].close();
+                } else {
+                  watcher.remove(pair[1]);
+                }
+              }
+            } finally {
+              lock.unlock();
+            }
+          }
+        } catch (Throwable e) {
+          LOG.error(e);
+          throw new RuntimeException(e);
+        }
+      }
+    });
+
+    adderThread.start();
+    removerThread.start();
+    Uninterruptibles.joinUninterruptibly(adderThread);
+    Uninterruptibles.joinUninterruptibly(removerThread);
+    watcher.close();
+  }
+}