You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cn...@apache.org on 2013/09/28 00:51:13 UTC

svn commit: r1527113 - in /hadoop/common/trunk/hadoop-common-project/hadoop-common/src: main/java/org/apache/hadoop/fs/ main/java/org/apache/hadoop/io/ main/java/org/apache/hadoop/util/ test/java/org/apache/hadoop/util/

Author: cnauroth
Date: Fri Sep 27 22:51:12 2013
New Revision: 1527113

URL: http://svn.apache.org/r1527113
Log:
HDFS-5260. Merge zero-copy memory-mapped HDFS client reads to trunk and branch-2. Contributed by Chris Nauroth.

Added:
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ByteBufferUtil.java
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HasEnhancedByteBufferAccess.java
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ReadOption.java
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ZeroCopyUnavailableException.java
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ByteBufferPool.java
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ElasticByteBufferPool.java
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/IdentityHashStore.java
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestIdentityHashStore.java
Modified:
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSInputStream.java

Added: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ByteBufferUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ByteBufferUtil.java?rev=1527113&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ByteBufferUtil.java (added)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ByteBufferUtil.java Fri Sep 27 22:51:12 2013
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.io.ByteBufferPool;
+
+import com.google.common.base.Preconditions;
+
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public final class ByteBufferUtil {
+
+  /**
+   * Determine if a stream can do a byte buffer read via read(ByteBuffer buf)
+   */
+  private static boolean streamHasByteBufferRead(InputStream stream) {
+    if (!(stream instanceof ByteBufferReadable)) {
+      return false;
+    }
+    if (!(stream instanceof FSDataInputStream)) {
+      return true;
+    }
+    return ((FSDataInputStream)stream).getWrappedStream() 
+        instanceof ByteBufferReadable;
+  }
+
+  /**
+   * Perform a fallback read.
+   */
+  public static ByteBuffer fallbackRead(
+      InputStream stream, ByteBufferPool bufferPool, int maxLength)
+          throws IOException {
+    if (bufferPool == null) {
+      throw new UnsupportedOperationException("zero-copy reads " +
+          "were not available, and you did not provide a fallback " +
+          "ByteBufferPool.");
+    }
+    boolean useDirect = streamHasByteBufferRead(stream);
+    ByteBuffer buffer = bufferPool.getBuffer(useDirect, maxLength);
+    if (buffer == null) {
+      throw new UnsupportedOperationException("zero-copy reads " +
+          "were not available, and the ByteBufferPool did not provide " +
+          "us with " + (useDirect ? "a direct" : "an indirect") +
+          "buffer.");
+    }
+    Preconditions.checkState(buffer.capacity() > 0);
+    Preconditions.checkState(buffer.isDirect() == useDirect);
+    maxLength = Math.min(maxLength, buffer.capacity());
+    boolean success = false;
+    try {
+      if (useDirect) {
+        buffer.clear();
+        buffer.limit(maxLength);
+        ByteBufferReadable readable = (ByteBufferReadable)stream;
+        int totalRead = 0;
+        while (true) {
+          if (totalRead >= maxLength) {
+            success = true;
+            break;
+          }
+          int nRead = readable.read(buffer);
+          if (nRead < 0) {
+            if (totalRead > 0) {
+              success = true;
+            }
+            break;
+          }
+          totalRead += nRead;
+        }
+        buffer.flip();
+      } else {
+        buffer.clear();
+        int nRead = stream.read(buffer.array(),
+            buffer.arrayOffset(), maxLength);
+        if (nRead >= 0) {
+          buffer.limit(nRead);
+          success = true;
+        }
+      }
+    } finally {
+      if (!success) {
+        // If we got an error while reading, or if we are at EOF, we 
+        // don't need the buffer any more.  We can give it back to the
+        // bufferPool.
+        bufferPool.putBuffer(buffer);
+        buffer = null;
+      }
+    }
+    return buffer;
+  }
+}

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java?rev=1527113&r1=1527112&r2=1527113&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java Fri Sep 27 22:51:12 2013
@@ -1,4 +1,5 @@
 /**
+ * 
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -19,17 +20,29 @@ package org.apache.hadoop.fs;
 
 import java.io.*;
 import java.nio.ByteBuffer;
+import java.util.EnumSet;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.io.ByteBufferPool;
+import org.apache.hadoop.fs.ByteBufferUtil;
+import org.apache.hadoop.util.IdentityHashStore;
 
 /** Utility that wraps a {@link FSInputStream} in a {@link DataInputStream}
  * and buffers input through a {@link BufferedInputStream}. */
 @InterfaceAudience.Public
 @InterfaceStability.Stable
 public class FSDataInputStream extends DataInputStream
-    implements Seekable, PositionedReadable, Closeable,
-    ByteBufferReadable, HasFileDescriptor, CanSetDropBehind, CanSetReadahead {
+    implements Seekable, PositionedReadable, Closeable, 
+      ByteBufferReadable, HasFileDescriptor, CanSetDropBehind, CanSetReadahead,
+      HasEnhancedByteBufferAccess {
+  /**
+   * Map ByteBuffers that we have handed out to readers to ByteBufferPool 
+   * objects
+   */
+  private final IdentityHashStore<ByteBuffer, ByteBufferPool>
+    extendedReadBuffers
+      = new IdentityHashStore<ByteBuffer, ByteBufferPool>(0);
 
   public FSDataInputStream(InputStream in)
     throws IOException {
@@ -167,4 +180,45 @@ public class FSDataInputStream extends D
           "support setting the drop-behind caching setting.");
     }
   }
+
+  @Override
+  public ByteBuffer read(ByteBufferPool bufferPool, int maxLength,
+      EnumSet<ReadOption> opts) 
+          throws IOException, UnsupportedOperationException {
+    try {
+      return ((HasEnhancedByteBufferAccess)in).read(bufferPool,
+          maxLength, opts);
+    }
+    catch (ClassCastException e) {
+      ByteBuffer buffer = ByteBufferUtil.
+          fallbackRead(this, bufferPool, maxLength);
+      if (buffer != null) {
+        extendedReadBuffers.put(buffer, bufferPool);
+      }
+      return buffer;
+    }
+  }
+
+  private static final EnumSet<ReadOption> EMPTY_READ_OPTIONS_SET =
+      EnumSet.noneOf(ReadOption.class);
+
+  final public ByteBuffer read(ByteBufferPool bufferPool, int maxLength)
+          throws IOException, UnsupportedOperationException {
+    return read(bufferPool, maxLength, EMPTY_READ_OPTIONS_SET);
+  }
+  
+  @Override
+  public void releaseBuffer(ByteBuffer buffer) {
+    try {
+      ((HasEnhancedByteBufferAccess)in).releaseBuffer(buffer);
+    }
+    catch (ClassCastException e) {
+      ByteBufferPool bufferPool = extendedReadBuffers.remove( buffer);
+      if (bufferPool == null) {
+        throw new IllegalArgumentException("tried to release a buffer " +
+            "that was not created by this stream.");
+      }
+      bufferPool.putBuffer(buffer);
+    }
+  }
 }

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSInputStream.java?rev=1527113&r1=1527112&r2=1527113&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSInputStream.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSInputStream.java Fri Sep 27 22:51:12 2013
@@ -18,9 +18,11 @@
 package org.apache.hadoop.fs;
 
 import java.io.*;
+import java.nio.ByteBuffer;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.ZeroCopyUnavailableException;
 
 /****************************************************************
  * FSInputStream is a generic old InputStream with a little bit

Added: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HasEnhancedByteBufferAccess.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HasEnhancedByteBufferAccess.java?rev=1527113&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HasEnhancedByteBufferAccess.java (added)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HasEnhancedByteBufferAccess.java Fri Sep 27 22:51:12 2013
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.EnumSet;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.io.ByteBufferPool;
+
+/**
+ * FSDataInputStreams implement this interface to provide enhanced
+ * byte buffer access.  Usually this takes the form of mmap support.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public interface HasEnhancedByteBufferAccess {
+  /**
+   * Get a ByteBuffer containing file data.
+   *
+   * This ByteBuffer may come from the stream itself, via a call like mmap,
+   * or it may come from the ByteBufferFactory which is passed in as an
+   * argument.
+   *
+   * @param factory
+   *            If this is non-null, it will be used to create a fallback
+   *            ByteBuffer when the stream itself cannot create one.
+   * @param maxLength
+   *            The maximum length of buffer to return.  We may return a buffer
+   *            which is shorter than this.
+   * @param opts
+   *            Options to use when reading.
+   *
+   * @return
+   *            We will return null on EOF (and only on EOF).
+   *            Otherwise, we will return a direct ByteBuffer containing at
+   *            least one byte.  You must free this ByteBuffer when you are 
+   *            done with it by calling releaseBuffer on it.
+   *            The buffer will continue to be readable until it is released 
+   *            in this manner.  However, the input stream's close method may
+   *            warn about unclosed buffers.
+   * @throws
+   *            IOException: if there was an error reading.
+   *            UnsupportedOperationException: if factory was null, and we
+   *            needed an external byte buffer.  UnsupportedOperationException
+   *            will never be thrown unless the factory argument is null.
+   */
+  public ByteBuffer read(ByteBufferPool factory, int maxLength,
+      EnumSet<ReadOption> opts)
+          throws IOException, UnsupportedOperationException;
+
+  /**
+   * Release a ByteBuffer which was created by the enhanced ByteBuffer read
+   * function. You must not continue using the ByteBuffer after calling this 
+   * function.
+   *
+   * @param buffer
+   *            The ByteBuffer to release.
+   */
+  public void releaseBuffer(ByteBuffer buffer);
+}
\ No newline at end of file

Added: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ReadOption.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ReadOption.java?rev=1527113&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ReadOption.java (added)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ReadOption.java Fri Sep 27 22:51:12 2013
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Options that can be used when reading from a FileSystem.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public enum ReadOption {
+  /**
+   * Skip checksums when reading.  This option may be useful when reading a file
+   * format that has built-in checksums, or for testing purposes.
+   */
+  SKIP_CHECKSUMS,
+}

Added: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ZeroCopyUnavailableException.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ZeroCopyUnavailableException.java?rev=1527113&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ZeroCopyUnavailableException.java (added)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ZeroCopyUnavailableException.java Fri Sep 27 22:51:12 2013
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import java.io.IOException;
+
+public class ZeroCopyUnavailableException extends IOException {
+  private static final long serialVersionUID = 0L;
+
+  public ZeroCopyUnavailableException(String message) {
+    super(message);
+  }
+
+  public ZeroCopyUnavailableException(String message, Exception e) {
+    super(message, e);
+  }
+
+  public ZeroCopyUnavailableException(Exception e) {
+    super(e);
+  }
+}

Added: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ByteBufferPool.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ByteBufferPool.java?rev=1527113&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ByteBufferPool.java (added)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ByteBufferPool.java Fri Sep 27 22:51:12 2013
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public interface ByteBufferPool {
+  /**
+   * Get a new direct ByteBuffer.  The pool can provide this from
+   * removing a buffer from its internal cache, or by allocating a 
+   * new buffer.
+   *
+   * @param direct     Whether the buffer should be direct.
+   * @param length     The minimum length the buffer will have.
+   * @return           A new ByteBuffer.  This ByteBuffer must be direct.
+   *                   Its capacity can be less than what was requested, but
+   *                   must be at least 1 byte.
+   */
+  ByteBuffer getBuffer(boolean direct, int length);
+
+  /**
+   * Release a buffer back to the pool.
+   * The pool may choose to put this buffer into its cache.
+   *
+   * @param buffer    a direct bytebuffer
+   */
+  void putBuffer(ByteBuffer buffer);
+}

Added: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ElasticByteBufferPool.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ElasticByteBufferPool.java?rev=1527113&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ElasticByteBufferPool.java (added)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ElasticByteBufferPool.java Fri Sep 27 22:51:12 2013
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io;
+
+import com.google.common.collect.ComparisonChain;
+import org.apache.commons.lang.builder.HashCodeBuilder;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * This is a simple ByteBufferPool which just creates ByteBuffers as needed.
+ * It also caches ByteBuffers after they're released.  It will always return
+ * the smallest cached buffer with at least the capacity you request.
+ * We don't try to do anything clever here like try to limit the maximum cache
+ * size.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public final class ElasticByteBufferPool implements ByteBufferPool {
+  private static final class Key implements Comparable<Key> {
+    private final int capacity;
+    private final long insertionTime;
+
+    Key(int capacity, long insertionTime) {
+      this.capacity = capacity;
+      this.insertionTime = insertionTime;
+    }
+
+    @Override
+    public int compareTo(Key other) {
+      return ComparisonChain.start().
+          compare(capacity, other.capacity).
+          compare(insertionTime, other.insertionTime).
+          result();
+    }
+
+    @Override
+    public boolean equals(Object rhs) {
+      if (rhs == null) {
+        return false;
+      }
+      try {
+        Key o = (Key)rhs;
+        return (compareTo(o) == 0);
+      } catch (ClassCastException e) {
+        return false;
+      }
+    }
+
+    @Override
+    public int hashCode() {
+      return new HashCodeBuilder().
+          append(capacity).
+          append(insertionTime).
+          toHashCode();
+    }
+  }
+
+  private final TreeMap<Key, ByteBuffer> buffers =
+      new TreeMap<Key, ByteBuffer>();
+
+  private final TreeMap<Key, ByteBuffer> directBuffers =
+      new TreeMap<Key, ByteBuffer>();
+
+  private final TreeMap<Key, ByteBuffer> getBufferTree(boolean direct) {
+    return direct ? directBuffers : buffers;
+  }
+  
+  @Override
+  public synchronized ByteBuffer getBuffer(boolean direct, int length) {
+    TreeMap<Key, ByteBuffer> tree = getBufferTree(direct);
+    Map.Entry<Key, ByteBuffer> entry =
+        tree.ceilingEntry(new Key(length, 0));
+    if (entry == null) {
+      return direct ? ByteBuffer.allocateDirect(length) :
+                      ByteBuffer.allocate(length);
+    }
+    tree.remove(entry.getKey());
+    return entry.getValue();
+  }
+
+  @Override
+  public synchronized void putBuffer(ByteBuffer buffer) {
+    TreeMap<Key, ByteBuffer> tree = getBufferTree(buffer.isDirect());
+    while (true) {
+      Key key = new Key(buffer.capacity(), System.nanoTime());
+      if (!tree.containsKey(key)) {
+        tree.put(key, buffer);
+        return;
+      }
+      // Buffers are indexed by (capacity, time).
+      // If our key is not unique on the first try, we try again, since the
+      // time will be different.  Since we use nanoseconds, it's pretty
+      // unlikely that we'll loop even once, unless the system clock has a
+      // poor granularity.
+    }
+  }
+}

Added: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/IdentityHashStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/IdentityHashStore.java?rev=1527113&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/IdentityHashStore.java (added)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/IdentityHashStore.java Fri Sep 27 22:51:12 2013
@@ -0,0 +1,197 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.util;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * The IdentityHashStore stores (key, value) mappings in an array.
+ * It is similar to java.util.HashTable, but much more lightweight.
+ * Neither inserting nor removing an element ever leads to any garbage
+ * getting created (assuming the array doesn't need to be enlarged).
+ *
+ * Unlike HashTable, it compares keys using
+ * {@link System#identityHashCode(Object)} and the identity operator.
+ * This is useful for types like ByteBuffer which have expensive hashCode
+ * and equals operators.
+ *
+ * We use linear probing to resolve collisions.  This avoids the need for
+ * the overhead of linked list data structures.  It also means that it is
+ * expensive to attempt to remove an element that isn't there, since we
+ * have to look at the entire array to be sure that it doesn't exist.
+ *
+ * @param <K>    The key type to use.
+ * @param <V>    THe value type to use.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+@SuppressWarnings("unchecked")
+public final class IdentityHashStore<K, V> {
+  /**
+   * Even elements are keys; odd elements are values.
+   * The array has size 1 + Math.pow(2, capacity).
+   */
+  private Object buffer[];
+
+  private int numInserted = 0;
+
+  private int capacity;
+
+  /**
+   * The default maxCapacity value to use.
+   */
+  private static final int DEFAULT_MAX_CAPACITY = 2;
+
+  public IdentityHashStore(int capacity) {
+    Preconditions.checkArgument(capacity >= 0);
+    if (capacity == 0) {
+      this.capacity = 0;
+      this.buffer = null;
+    } else {
+      // Round the capacity we need up to a power of 2.
+      realloc((int)Math.pow(2,
+          Math.ceil(Math.log(capacity) / Math.log(2))));
+    }
+  }
+
+  private void realloc(int newCapacity) {
+    Preconditions.checkArgument(newCapacity > 0);
+    Object prevBuffer[] = buffer;
+    this.capacity = newCapacity;
+    // Each element takes two array slots -- one for the key, 
+    // and another for the value.  We also want a load factor 
+    // of 0.50.  Combine those together and you get 4 * newCapacity.
+    this.buffer = new Object[4 * newCapacity];
+    this.numInserted = 0;
+    if (prevBuffer != null) {
+      for (int i = 0; i < prevBuffer.length; i += 2) {
+        if (prevBuffer[i] != null) {
+          putInternal(prevBuffer[i], prevBuffer[i + 1]);
+        }
+      }
+    }
+  }
+
+  private void putInternal(Object k, Object v) {
+    int hash = System.identityHashCode(k);
+    final int numEntries = buffer.length / 2;
+    int index = hash % numEntries;
+    while (true) {
+      if (buffer[2 * index] == null) {
+        buffer[2 * index] = k;
+        buffer[1 + (2 * index)] = v;
+        numInserted++;
+        return;
+      }
+      index = (index + 1) % numEntries;
+    }
+  }
+
+  /**
+   * Add a new (key, value) mapping.
+   *
+   * Inserting a new (key, value) never overwrites a previous one.
+   * In other words, you can insert the same key multiple times and it will
+   * lead to multiple entries.
+   */
+  public void put(K k, V v) {
+    Preconditions.checkNotNull(k);
+    if (buffer == null) {
+      realloc(DEFAULT_MAX_CAPACITY);
+    } else if (numInserted + 1 > capacity) {
+      realloc(capacity * 2);
+    }
+    putInternal(k, v);
+  }
+
+  private int getElementIndex(K k) {
+    if (buffer == null) {
+      return -1;
+    }
+    final int numEntries = buffer.length / 2;
+    int hash = System.identityHashCode(k);
+    int index = hash % numEntries;
+    int firstIndex = index;
+    do {
+      if (buffer[2 * index] == k) {
+        return index;
+      }
+      index = (index + 1) % numEntries;
+    } while (index != firstIndex);
+    return -1;
+  }
+
+  /**
+   * Retrieve a value associated with a given key.
+   */
+  public V get(K k) {
+    int index = getElementIndex(k);
+    if (index < 0) {
+      return null;
+    }
+    return (V)buffer[1 + (2 * index)];
+  }
+
+  /**
+   * Retrieve a value associated with a given key, and delete the
+   * relevant entry.
+   */
+  public V remove(K k) {
+    int index = getElementIndex(k);
+    if (index < 0) {
+      return null;
+    }
+    V val = (V)buffer[1 + (2 * index)];
+    buffer[2 * index] = null;
+    buffer[1 + (2 * index)] = null;
+    numInserted--;
+    return val;
+  }
+
+  public boolean isEmpty() {
+    return numInserted == 0;
+  }
+
+  public int numElements() {
+    return numInserted;
+  }
+
+  public int capacity() {
+    return capacity;
+  }
+
+  public interface Visitor<K, V> {
+    void accept(K k, V v);
+  }
+
+  /**
+   * Visit all key, value pairs in the IdentityHashStore.
+   */
+  public void visitAll(Visitor<K, V> visitor) {
+    int length = buffer == null ? 0 : buffer.length;
+    for (int i = 0; i < length; i += 2) {
+      if (buffer[i] != null) {
+        visitor.accept((K)buffer[i], (V)buffer[i + 1]);
+      }
+    }
+  }
+}

Added: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestIdentityHashStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestIdentityHashStore.java?rev=1527113&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestIdentityHashStore.java (added)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestIdentityHashStore.java Fri Sep 27 22:51:12 2013
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.util;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+import junit.framework.Assert;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.util.IdentityHashStore;
+import org.apache.hadoop.util.IdentityHashStore.Visitor;
+import org.junit.Test;
+
+public class TestIdentityHashStore {
+  private static final Log LOG = LogFactory.getLog(TestIdentityHashStore.class.getName());
+
+  private static class Key {
+    private final String name;
+
+    Key(String name) {
+      this.name = name;
+    }
+    
+    @Override
+    public int hashCode() {
+      throw new RuntimeException("should not be used!");
+    }
+    
+    @Override
+    public boolean equals(Object o) {
+      if (!(o instanceof Key)) {
+        return false;
+      }
+      Key other = (Key)o;
+      return name.equals(other.name);
+    }
+  }
+
+  @Test(timeout=60000)
+  public void testStartingWithZeroCapacity() {
+    IdentityHashStore<Key, Integer> store = 
+        new IdentityHashStore<Key, Integer>(0);
+    store.visitAll(new Visitor<Key, Integer>() {
+      @Override
+      public void accept(Key k, Integer v) {
+        Assert.fail("found key " + k + " in empty IdentityHashStore.");
+      }
+    });
+    Assert.assertTrue(store.isEmpty());
+    final Key key1 = new Key("key1");
+    Integer value1 = new Integer(100);
+    store.put(key1, value1);
+    Assert.assertTrue(!store.isEmpty());
+    Assert.assertEquals(value1, store.get(key1));
+    store.visitAll(new Visitor<Key, Integer>() {
+      @Override
+      public void accept(Key k, Integer v) {
+        Assert.assertEquals(key1, k);
+      }
+    });
+    Assert.assertEquals(value1, store.remove(key1));
+    Assert.assertTrue(store.isEmpty());
+  }
+  
+  @Test(timeout=60000)
+  public void testDuplicateInserts() {
+    IdentityHashStore<Key, Integer> store = 
+        new IdentityHashStore<Key, Integer>(4);
+    store.visitAll(new Visitor<Key, Integer>() {
+      @Override
+      public void accept(Key k, Integer v) {
+        Assert.fail("found key " + k + " in empty IdentityHashStore.");
+      }
+    });
+    Assert.assertTrue(store.isEmpty());
+    Key key1 = new Key("key1");
+    Integer value1 = new Integer(100);
+    Integer value2 = new Integer(200);
+    Integer value3 = new Integer(300);
+    store.put(key1, value1);
+    Key equalToKey1 = new Key("key1");
+
+    // IdentityHashStore compares by object equality, not equals()
+    Assert.assertNull(store.get(equalToKey1)); 
+
+    Assert.assertTrue(!store.isEmpty());
+    Assert.assertEquals(value1, store.get(key1));
+    store.put(key1, value2);
+    store.put(key1, value3);
+    final List<Integer> allValues = new LinkedList<Integer>();
+    store.visitAll(new Visitor<Key, Integer>() {
+      @Override
+      public void accept(Key k, Integer v) {
+        allValues.add(v);
+      }
+    });
+    Assert.assertEquals(3, allValues.size());
+    for (int i = 0; i < 3; i++) {
+      Integer value = store.remove(key1);
+      Assert.assertTrue(allValues.remove(value));
+    }
+    Assert.assertNull(store.remove(key1));
+    Assert.assertTrue(store.isEmpty());
+  }
+  
+  @Test(timeout=60000)
+  public void testAdditionsAndRemovals() {
+    IdentityHashStore<Key, Integer> store = 
+        new IdentityHashStore<Key, Integer>(0);
+    final int NUM_KEYS = 1000;
+    LOG.debug("generating " + NUM_KEYS + " keys");
+    final List<Key> keys = new ArrayList<Key>(NUM_KEYS);
+    for (int i = 0; i < NUM_KEYS; i++) {
+      keys.add(new Key("key " + i));
+    }
+    for (int i = 0; i < NUM_KEYS; i++) {
+      store.put(keys.get(i), i);
+    }
+    store.visitAll(new Visitor<Key, Integer>() {
+      @Override
+      public void accept(Key k, Integer v) {
+        Assert.assertTrue(keys.contains(k));
+      }
+    });
+    for (int i = 0; i < NUM_KEYS; i++) {
+      Assert.assertEquals(Integer.valueOf(i),
+          store.remove(keys.get(i)));
+    }
+    store.visitAll(new Visitor<Key, Integer>() {
+      @Override
+      public void accept(Key k, Integer v) {
+        Assert.fail("expected all entries to be removed");
+      }
+    });
+    Assert.assertTrue("expected the store to be " +
+        "empty, but found " + store.numElements() + " elements.",
+        store.isEmpty());
+    Assert.assertEquals(1024, store.capacity());
+  }
+  
+}