You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cm...@apache.org on 2015/03/10 00:00:33 UTC

hadoop git commit: HDFS-7844. Create an off-heap hash table implementation (cmccabe)

Repository: hadoop
Updated Branches:
  refs/heads/HDFS-7836 2214dab60 -> 087611208


HDFS-7844. Create an off-heap hash table implementation (cmccabe)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/08761120
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/08761120
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/08761120

Branch: refs/heads/HDFS-7836
Commit: 0876112086d3995ce67f8f2dd26ef1fab563713a
Parents: 2214dab
Author: Colin Patrick Mccabe <cm...@cloudera.com>
Authored: Mon Mar 9 15:43:59 2015 -0700
Committer: Colin Patrick Mccabe <cm...@cloudera.com>
Committed: Mon Mar 9 15:43:59 2015 -0700

----------------------------------------------------------------------
 .../hadoop/fs/CommonConfigurationKeys.java      |   3 +
 .../util/offheap/ByteArrayMemoryManager.java    | 272 ++++++++
 .../hadoop/util/offheap/MemoryManager.java      | 107 ++++
 .../util/offheap/NativeMemoryManager.java       | 143 +++++
 .../hadoop/util/offheap/ProbingHashTable.java   | 636 +++++++++++++++++++
 .../hadoop/util/offheap/TestMemoryManager.java  | 202 ++++++
 .../util/offheap/TestProbingHashTable.java      | 392 ++++++++++++
 7 files changed, 1755 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/08761120/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
index 442dc7d..ece58e3 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
@@ -286,4 +286,7 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic {
   public static final String NFS_EXPORTS_ALLOWED_HOSTS_SEPARATOR = ";";
   public static final String NFS_EXPORTS_ALLOWED_HOSTS_KEY = "nfs.exports.allowed.hosts";
   public static final String NFS_EXPORTS_ALLOWED_HOSTS_KEY_DEFAULT = "* rw";
+
+  public static final String HADOOP_MEMORY_MANAGER_KEY = "hadoop.memory.manager";
+  public static final String HADOOP_MEMORY_MANAGER_DEFAULT = "org.apache.hadoop.util.offheap.NativeMemoryManager";
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/08761120/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/offheap/ByteArrayMemoryManager.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/offheap/ByteArrayMemoryManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/offheap/ByteArrayMemoryManager.java
new file mode 100644
index 0000000..57c7c76
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/offheap/ByteArrayMemoryManager.java
@@ -0,0 +1,272 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.util.offheap;
+
+import java.io.IOException;
+import java.lang.Long;
+import java.lang.RuntimeException;
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.TreeMap;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * ByteArrayMemoryManager is a memory manager which keeps all memory on the Java
+ * heap.  It is useful for testing, since it peforms validation of all memory
+ * accesses and writes.  It also can be used if sun.misc.Unsafe is not
+ * available, although its performance will be less than that of the off-heap
+ * code.
+ */
+@Private
+@Unstable
+public class ByteArrayMemoryManager implements MemoryManager {
+  static final Logger LOG =
+      LoggerFactory.getLogger(ByteArrayMemoryManager.class);
+
+  private final static long MAX_ADDRESS = 0x3fffffffffffffffL;
+
+  private final TreeMap<Long, byte[]> buffers = new TreeMap<Long, byte[]>();
+
+  private long curAddress = 1000;
+
+  private final String name;
+
+  public ByteArrayMemoryManager(String name) {
+    this.name = name;
+    LOG.debug("Created {}.", this);
+  }
+
+  @Override
+  public synchronized void close() throws IOException {
+    Iterator<Entry<Long, byte[]>> iter = buffers.entrySet().iterator();
+    if (iter.hasNext()) {
+      StringBuilder bld = new StringBuilder();
+      Entry<Long, byte[]> entry = iter.next();
+      bld.append(entryToString(entry));
+      int numPrinted = 1;
+      while (iter.hasNext()) {
+        if (numPrinted >= 10) {
+          bld.append("...");
+          break;
+        }
+        bld.append(", ").append(entryToString(entry));
+        numPrinted++;
+      }
+      throw new RuntimeException("There are still unfreed buffers.  " +
+          bld.toString());
+    }
+    LOG.debug("Closed {}.", this);
+  }
+
+  private static String entryToString(Entry<Long, byte[]> entry) {
+    StringBuilder bld = new StringBuilder();
+    bld.append("Entry(base=0x").append(Long.toHexString(entry.getKey())).
+        append(", len=0x").append(Long.toHexString(entry.getValue().length)).
+        append(")");
+    return bld.toString();
+  }
+
+  @Override
+  public synchronized long allocate(long size) {
+    if (curAddress + size > MAX_ADDRESS) {
+      throw new RuntimeException("Cannot allocate any more memory.");
+    }
+    if (size > 0x7fffffff) {
+      throw new RuntimeException("Attempted to allocate " + size +
+          " bytes, but we cannot allocate a Java byte array with " +
+          "more than 2^^31 entries.");
+    }
+    long addr = curAddress;
+    curAddress += size;
+    byte val[] = new byte[(int)size];
+    buffers.put(Long.valueOf(addr), val);
+    LOG.trace("Allocated Entry(base=0x{}, len=0x{})",
+        Long.toHexString(addr), Long.toHexString(val.length));
+    return addr;
+  }
+
+  @Override
+  public synchronized long allocateZeroed(long size) {
+    // Java byte arrays are always zeroed on construction.
+    return allocate(size);
+  }
+
+  @Override
+  public synchronized void free(long addr) {
+    byte val[] =  buffers.remove(Long.valueOf(addr));
+    if (val == null) {
+      LOG.error("Attempted to free unallocated address 0x{}",
+          Long.toHexString(addr));
+    } else {
+      LOG.trace("Freed Entry(base=0x{}, len=0x{})",
+          Long.toHexString(addr), Long.toHexString(val.length));
+    }
+  }
+
+  private synchronized Entry<Long, byte[]> getEntry(long addr, String op) {
+    Entry<Long, byte[]> entry = buffers.floorEntry(Long.valueOf(addr));
+    if (entry == null) {
+      throw new RuntimeException(op + " unallocated address 0x" +
+          Long.toHexString(addr));
+    }
+    return entry;
+  }
+
+  @Override
+  public synchronized byte getByte(long addr) {
+    Entry<Long, byte[]> entry = getEntry(addr, "Accessed");
+    long off = addr - entry.getKey();
+    byte arr[] = entry.getValue();
+    if (off + 1 > arr.length) {
+      throw new RuntimeException("Attempted to read unallocated memory " +
+          "at 0x" + Long.toHexString(addr) + ".  Closest lower allocated area " +
+          "is " + entryToString(entry));
+    }
+    int i = (int)off;
+    return arr[i];
+  }
+
+  @Override
+  public void putByte(long addr, byte val) {
+    Entry<Long, byte[]> entry = getEntry(addr, "Wrote to");
+    long off = addr - entry.getKey();
+    byte arr[] = entry.getValue();
+    if (off + 1 > arr.length) {
+      throw new RuntimeException("Attempted to write to unallocated memory " +
+          "at 0x" + Long.toHexString(addr) + ".  Closest lower allocated area " +
+          "is " + entryToString(entry));
+    }
+    int i = (int)off;
+    arr[i] = val;
+  }
+
+  @Override
+  public synchronized short getShort(long addr) {
+    Entry<Long, byte[]> entry = getEntry(addr, "Accessed");
+    long off = addr - entry.getKey();
+    byte arr[] = entry.getValue();
+    if (off + 2 > arr.length) {
+      throw new RuntimeException("Attempted to read unallocated memory " +
+          "at 0x" + Long.toHexString(addr) + ".  Closest lower allocated " +
+          "area is " + entryToString(entry));
+    }
+    int i = (int)off;
+    return (short)((arr[i + 0] & 0xff) << 8 |
+        (arr[i + 1] & 0xff));
+  }
+
+  @Override
+  public void putShort(long addr, short val) {
+    Entry<Long, byte[]> entry = getEntry(addr, "Wrote to");
+    long off = addr - entry.getKey();
+    byte arr[] = entry.getValue();
+    if (off + 2 > arr.length) {
+      throw new RuntimeException("Attempted to write to unallocated memory " +
+          "at 0x" + Long.toHexString(addr) + ".  Closest lower allocated " +
+          "area is " + entryToString(entry));
+    }
+    int i = (int)off;
+    arr[i + 0] = (byte)((val >> 8) & 0xff);
+    arr[i + 1] = (byte)(val & 0xff);
+  }
+
+  @Override
+  public int getInt(long addr) {
+    Entry<Long, byte[]> entry = getEntry(addr, "Accessed");
+    long off = addr - entry.getKey();
+    byte arr[] = entry.getValue();
+    if (off + 4 > arr.length) {
+      throw new RuntimeException("Attempted to read unallocated memory " +
+          "at 0x" + Long.toHexString(addr) + ".  Closest lower allocated " +
+          "area is " + entryToString(entry));
+    }
+    int i = (int)off;
+    return arr[i + 0] << 24 | 
+           (arr[i + 1] & 0xff) << 16 |
+           (arr[i + 2] & 0xff) << 8 |
+           (arr[i + 3] & 0xff);
+  }
+
+  @Override
+  public void putInt(long addr, int val) {
+    Entry<Long, byte[]> entry = getEntry(addr, "Wrote to");
+    long off = addr - entry.getKey();
+    byte arr[] = entry.getValue();
+    if (off + 4 > arr.length) {
+      throw new RuntimeException("Attempted to write to unallocated memory " +
+          "at 0x" + Long.toHexString(addr) + ".  Closest lower allocated " +
+          "area is " + entryToString(entry));
+    }
+    int i = (int)off;
+    arr[i + 0] = (byte)((val >> 24) & 0xff);
+    arr[i + 1] = (byte)((val >> 16) & 0xff);
+    arr[i + 2] = (byte)((val >> 8) & 0xff);
+    arr[i + 3] = (byte)(val & 0xff);
+  }
+
+  @Override
+  public long getLong(long addr) {
+    Entry<Long, byte[]> entry = getEntry(addr, "Accessed");
+    long off = addr - entry.getKey();
+    byte arr[] = entry.getValue();
+    if (off + 8 > arr.length) {
+      throw new RuntimeException("Attempted to read unallocated memory " +
+          "at 0x" + Long.toHexString(addr) + ".  Closest lower allocated " +
+          "area is " + entryToString(entry));
+    }
+    int i = (int)off;
+    return (arr[i + 0] & 0xffL) << 56 |
+           (arr[i + 1] & 0xffL) << 48 |
+           (arr[i + 2] & 0xffL) << 40 |
+           (arr[i + 3] & 0xffL) << 32 |
+           (arr[i + 4] & 0xffL) << 24 |
+           (arr[i + 5] & 0xffL) << 16 |
+           (arr[i + 6] & 0xffL) << 8 |
+           (arr[i + 7] & 0xffL);
+  }
+
+  @Override
+  public void putLong(long addr, long val) {
+    Entry<Long, byte[]> entry = getEntry(addr, "Wrote to");
+    long off = addr - entry.getKey();
+    byte arr[] = entry.getValue();
+    if (off + 8 > arr.length) {
+      throw new RuntimeException("Attempted to write to unallocated memory " +
+          "at 0x" + Long.toHexString(addr) + ".  Closest lower allocated " +
+          "area is " + entryToString(entry));
+    }
+    int i = (int)off;
+    arr[i + 0] = (byte)((val >> 56) & 0xff);
+    arr[i + 1] = (byte)((val >> 48) & 0xff);
+    arr[i + 2] = (byte)((val >> 40) & 0xff);
+    arr[i + 3] = (byte)((val >> 32) & 0xff);
+    arr[i + 4] = (byte)((val >> 24) & 0xff);
+    arr[i + 5] = (byte)((val >> 16) & 0xff);
+    arr[i + 6] = (byte)((val >> 8) & 0xff);
+    arr[i + 7] = (byte)(val & 0xff);
+  }
+
+  @Override
+  public String toString() {
+    return "ByteArrayMemoryManager(" + name + ")";
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/08761120/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/offheap/MemoryManager.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/offheap/MemoryManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/offheap/MemoryManager.java
new file mode 100644
index 0000000..24c67fc
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/offheap/MemoryManager.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.util.offheap;
+
+import java.io.Closeable;
+import java.lang.Class;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Allocates memory which may be off-heap.
+ *
+ * MemoryManager objects are thread-safe.  They can be used by multiple threads
+ * at once without additional synchronization.
+ */
+@Private
+@Unstable
+public interface MemoryManager extends Closeable {
+  /**
+   * Allocate a memory region.  Will never return 0.
+   */
+  long allocate(long size);
+
+  /**
+   * Allocate a zeroed memory region.  Will never return 0.
+   */
+  long allocateZeroed(long size);
+
+  /**
+   * Free memory.
+   */
+  void free(long addr);
+
+  byte getByte(long addr);
+
+  void putByte(long addr, byte val);
+
+  short getShort(long addr);
+
+  void putShort(long addr, short val);
+
+  int getInt(long addr);
+
+  void putInt(long addr, int val);
+
+  long getLong(long addr);
+
+  void putLong(long addr, long val);
+
+  String toString();
+
+  public static class Factory {
+    private static final Logger LOG = LoggerFactory.getLogger(Factory.class);
+
+    /**
+     * Create a MemoryManager from a Configuration.
+     *
+     * @param conf      The Configuration
+     *
+     * @return          The MemoryManager.
+     */
+    public static MemoryManager create(String name, Configuration conf) {
+      String memoryManagerKey = conf.get(
+          CommonConfigurationKeys.HADOOP_MEMORY_MANAGER_KEY,
+          CommonConfigurationKeys.HADOOP_MEMORY_MANAGER_DEFAULT);
+      if (memoryManagerKey == null) {
+        memoryManagerKey = NativeMemoryManager.class.getCanonicalName();
+      }
+      Class<? extends MemoryManager> clazz =
+          (Class<? extends MemoryManager>)conf.
+              getClassByNameOrNull(memoryManagerKey);
+      if (clazz == null) {
+        LOG.error("Unable to locate {}: falling back on {}.",
+            memoryManagerKey, ByteArrayMemoryManager.class.getCanonicalName());
+      } else if (clazz != ByteArrayMemoryManager.class) {
+        try {
+          return clazz.getConstructor(String.class).newInstance(name);
+        } catch (Throwable t) {
+          LOG.error("Unable to create {}.  Falling back on {}", memoryManagerKey,
+              ByteArrayMemoryManager.class.getCanonicalName(), t);
+        }
+      }
+      return new ByteArrayMemoryManager(name);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/08761120/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/offheap/NativeMemoryManager.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/offheap/NativeMemoryManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/offheap/NativeMemoryManager.java
new file mode 100644
index 0000000..17a77e6
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/offheap/NativeMemoryManager.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.util.offheap;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.lang.Throwable;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import sun.misc.Unsafe;
+
+/**
+ * NativeMemoryManager is a memory manager which uses sun.misc.Unsafe to
+ * allocate memory off-heap.  This memory will be allocated using the current
+ * platform's equivalent of malloc().
+ */
+@Private
+@Unstable
+public class NativeMemoryManager implements MemoryManager {
+  static final Logger LOG =
+      LoggerFactory.getLogger(NativeMemoryManager.class);
+
+  private final static Unsafe unsafe;
+
+  private final static String loadingFailureReason;
+
+  static {
+    Unsafe myUnsafe = null;
+    String myLoadingFailureReason = null;
+    try {
+      Field f = Unsafe.class.getDeclaredField("theUnsafe");
+      f.setAccessible(true);
+      myUnsafe = (Unsafe)f.get(null);
+    } catch (Throwable e) {
+      myLoadingFailureReason = e.getMessage();
+    } finally {
+      unsafe = myUnsafe;
+      loadingFailureReason = myLoadingFailureReason;
+    }
+  }
+
+  private final String name;
+
+  public static boolean isAvailable() {
+    return loadingFailureReason == null;
+  }
+
+  public NativeMemoryManager(String name) {
+    if (loadingFailureReason != null) {
+      LOG.error("Failed to load sun.misc.Unsafe: " + loadingFailureReason);
+      throw new RuntimeException("Failed to load sun.misc.Unsafe: " +
+          loadingFailureReason);
+    }
+    this.name = name;
+    LOG.debug("Created {}.", this);
+  }
+
+  @Override
+  public void close() throws IOException {
+    // Nothing to do
+    LOG.debug("Closed {}.", this);
+  }
+
+  @Override
+  public long allocate(long size) {
+    return unsafe.allocateMemory(size);
+  }
+
+  @Override
+  public long allocateZeroed(long size) {
+    long addr = unsafe.allocateMemory(size);
+    unsafe.setMemory(addr, size, (byte)0);
+    return addr;
+  }
+
+  @Override
+  public void free(long addr) {
+    unsafe.freeMemory(addr);
+  }
+
+  @Override
+  public byte getByte(long addr) {
+    return unsafe.getByte(null, addr);
+  }
+
+  @Override
+  public void putByte(long addr, byte val) {
+    unsafe.putByte(null, addr, val);
+  }
+
+  @Override
+  public short getShort(long addr) {
+    return unsafe.getShort(null, addr);
+  }
+
+  @Override
+  public void putShort(long addr, short val) {
+    unsafe.putShort(addr, val);
+  }
+
+    @Override
+  public int getInt(long addr) {
+    return unsafe.getInt(null, addr);
+  }
+
+  @Override
+  public void putInt(long addr, int val) {
+    unsafe.putInt(null, addr, val);
+  }
+
+  @Override
+  public long getLong(long addr) {
+    return unsafe.getLong(null, addr);
+  }
+
+  @Override
+  public void putLong(long addr, long val) {
+    unsafe.putLong(null, addr, val);
+  }
+
+  @Override
+  public String toString() {
+    return "NativeMemoryManager(" + name + ")";
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/08761120/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/offheap/ProbingHashTable.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/offheap/ProbingHashTable.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/offheap/ProbingHashTable.java
new file mode 100644
index 0000000..d0ac2f8
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/offheap/ProbingHashTable.java
@@ -0,0 +1,636 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.util.offheap;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A hash table which can be off-heaped and uses probing.<p/>
+ *
+ * Not thread-safe.  Requires external synchronization.<p/>
+ *
+ * Each entry must be stored in a slot which takes a fixed number of bytes.  We
+ * assume that slots which are zeroed are empty.<p/>
+ *
+ * This hash table does not implement the Java collection interface, because we
+ * want to avoid some of the limitations of that interface.  For example, we
+ * want to be able to have more than 2^^32 entries and to be able to use a hash
+ * function which is wider than 32 bits.<p/>
+ *
+ * This hash table uses linear probing rather than separate chaining to handle
+ * hash collisions.  When we hit a collision when inserting, we put the new
+ * element into the next open slot.<p/>
+ *
+ * When the hash table gets more than a certain percent full, we double the size
+ * of the table.  This requires moving all existing entries.<p/>
+ */
+@Public
+@Unstable
+public class ProbingHashTable<K extends ProbingHashTable.Key,
+      E extends ProbingHashTable.Entry<K>> implements Closeable {
+  static final Logger LOG =
+      LoggerFactory.getLogger(ProbingHashTable.class);
+
+  /**
+   * Adapts a given entry class to work with this hash table.<p/>
+   *
+   * Specifically, the Adaptor handles storing elements into slots,
+   * retrieving them, clearing slots, and getting the hash code for entries.<p/>
+   */
+  public interface Adaptor<E> {
+    /**
+     * Get the slot size to use for this hash table.
+     *
+     * @return            How many bytes each slot is in the hash table.
+     */
+    int getSlotSize();
+
+    /**
+     * Load an entry from memory.
+     *
+     * @param addr        The address to use to create the entry.
+     *
+     * @return            null if the slot was empty; the entry, otherwise.
+     */
+    E load(long addr);
+
+    /**
+     * Store an entry to memory.
+     *
+     * @param e           The element to store to memory.
+     * @param addr        The address to store the element to.
+     */
+    void store(E e, long addr);
+
+    /**
+     * Clear a slot.
+     *
+     * @param addr        The address to clear.
+     */
+    void clear(long addr);
+  }
+
+  public interface Key {
+    /**
+     * Get a 64-bit hash code for this key.
+     */
+    long longHash();
+
+    /**
+     * Determine if this key equals another key.
+     */
+    boolean equals(Object other);
+
+    /**
+     * Get a human-readable representation of this key.
+     */
+    String toString();
+  }
+
+  /**
+   * An entry in the ProbingHashTable.
+   */
+  public interface Entry<K extends Key> {
+    /**
+     * Get the key for this entry.
+     */
+    K getKey();
+  }
+
+  /**
+   * The minimum size to allow.
+   */
+  private static long MIN_SIZE = 4;
+
+  /**
+   * The name of this hash table.
+   */
+  private final String name;
+
+  /**
+   * The memory manager for this hash table.
+   */
+  private final MemoryManager mman;
+
+  /**
+   * The size of each slot in bytes.
+   */
+  private final int slotSize;
+
+  /**
+   * The adaptor to use.
+   */
+  private final Adaptor<E> adaptor;
+
+  /**
+   * The base address of the hash table.
+   */
+  private long base;
+
+  /**
+   * The current number of slots in the hash table.
+   */
+  private long numSlots;
+
+  /**
+   * The current number of entries in the hash table.
+   */
+  private long numEntries;
+
+  /**
+   * The maximum load factor for this hash table.
+   */
+  private float maxLoadFactor;
+
+  /**
+   * The number of entries we should double at.
+   */
+  private long expansionThreshold;
+
+  public static long roundUpToPowerOf2(long i) {
+    long r = 1;
+    while (r < i) {
+      r = r << 1;
+    }
+    return r;
+  }
+
+  /**
+   * Create a new ProbingHashTable.
+   *
+   * @param name              The name of the ProbingHashTable.
+   * @param mman              The memory manager to use.
+   * @param adaptor           The entry factory to use.
+   * @param initialSize       The initial size of the hash table (in number of
+   *                              slots, not elements.)  Will be rounded up to a
+   *                              power of 2.
+   * @param maxLoadFactor     The maximum load factor to allow before doubling
+   *                              the hash table size.
+   */
+  public ProbingHashTable(String name, MemoryManager mman, Adaptor<E> adaptor,
+                        long initialSize, float maxLoadFactor) {
+    this.name = name;
+    this.mman = mman;
+    this.slotSize = adaptor.getSlotSize();
+    this.adaptor = adaptor;
+    if (initialSize < MIN_SIZE) {
+      initialSize = MIN_SIZE;
+    }
+    this.numSlots = roundUpToPowerOf2((long)(initialSize / maxLoadFactor));
+    long allocLen = numSlots * slotSize;
+    this.base = mman.allocateZeroed(allocLen);
+    this.numEntries = 0;
+    this.maxLoadFactor = maxLoadFactor;
+    this.expansionThreshold = (long)(numSlots * maxLoadFactor);
+    LOG.debug("Created ProbingHashTable(name={}, mman={}, slotSize={}, " +
+          "adaptor={}, numSlots={}, base=0x{}, allocLen=0x{}," +
+          "maxLoadFactor={}, expansionThreshold={})",
+          name, mman.toString(), slotSize,
+          adaptor.getClass().getCanonicalName(), numSlots,
+          Long.toHexString(base), Long.toHexString(allocLen), maxLoadFactor,
+          expansionThreshold);
+    Preconditions.checkArgument(maxLoadFactor > 0.0f);
+    Preconditions.checkArgument(maxLoadFactor < 1.0f);
+  }
+
+  /**
+   * Frees the memory associated with this hash table and does error checking.
+   */
+  public void close() throws IOException {
+    ProbingHashTableIterator iter = iterator();
+    if (iter.hasNext()) {
+      StringBuilder bld = new StringBuilder();
+      K k = iter.next();
+      bld.append(k.toString());
+      int numPrinted = 1;
+      while (iter.hasNext()) {
+        if (numPrinted >= 10) {
+          bld.append("...");
+          break;
+        }
+        bld.append(", ").append(iter.next().toString());
+        numPrinted++;
+      }
+      throw new RuntimeException("Attempted to close the hash table " +
+          " before all entries were removed.  There are still " + numEntries +
+          " entries remaining, including " + bld.toString());
+    }
+    free();
+  }
+
+  /**
+   * Frees the memory associated with this hash table.
+   */
+  void free() throws IOException {
+    if (this.base != 0) {
+      LOG.debug("Freeing {}.", this);
+      mman.free(this.base);
+      this.base = 0;
+    }
+  }
+
+  protected void finalize() throws Throwable {
+    try {
+      if (this.base != 0) {
+        LOG.error("Hash table {} was never closed.", this);
+        free();
+      }
+    } finally {
+      super.finalize();
+    }
+  }
+
+  private long getSlot(K key, long nSlots) {
+    long hash = key.longHash();
+    if (hash < 0) {
+      hash = -hash;
+    }
+    return hash % nSlots;
+  }
+
+  private E getInternal(K key, boolean remove) {
+    long originalSlot = getSlot(key, numSlots);
+    long slot = originalSlot;
+    long addr;
+    E target = null;
+    K targetKey = null;
+    while (true) {
+      addr = this.base + (slot * slotSize);
+      target = adaptor.load(addr);
+      if (target == null) {
+        // By the compactness invariant, we're done.  See below for more
+        // discussion.
+        LOG.trace("{}: getInternal(key={}, remove={}) found nothing.",
+            this, key, remove);
+        return null;
+      }
+      targetKey = target.getKey();
+      if (targetKey.equals(key)) {
+        break;
+      }
+      slot++;
+      if (slot == numSlots) {
+        slot = 0;
+      }
+      if (slot == originalSlot) {
+        LOG.trace("{}: getInternal(key={}, remove={}) found nothing",
+            this, key, remove);
+        return null;
+      }
+    }
+    if (remove) {
+      adaptor.clear(addr);
+      numEntries--;
+      maintainCompactness(slot);
+    }
+    LOG.trace("{}: getInternal(key={}, remove={}) found {}",
+        this, key, remove, targetKey);
+    return target;
+  }
+
+  /**
+   * Maintain the compactness invariant.<p/>
+   *
+   * In order to avoid doing a full array search when looking for an element
+   * that may not be in the hash table, we maintain a compactness invariant.
+   * The compactness invariant states that if we start at slot N and continue
+   * searching until we hit an empty slot, we will have searched all the
+   * possible places where the element could be.  We maintain the compactness
+   * invariant by doing a little bit of extra work each time we delete an entry.
+   * Specifically, we search forwards from the deleted entry, moving any keys
+   * that need to be moved to maintain the invariant.  We can stop searching
+   * when we hit an empty slot.<p/>
+   *
+   * Although maintaining the compactness invariant is O(N) in the worst case,
+   * it should be O(1) in the average case.  This is because the hash table is
+   * half empty at all times.  Assuming good hash dispersion, on average every
+   * other slot should be empty.  Therefore, the average number of entries we
+   * move here should be less than 1.<p/>
+   */
+  private void maintainCompactness(long startSlot) {
+    long slot = startSlot;
+    while (true) {
+      slot++;
+      if (slot == numSlots) {
+        slot = 0;
+      }
+      if (slot == startSlot) {
+        return;
+      }
+      long addr = this.base + (slot * slotSize);
+      E e = adaptor.load(addr);
+      if (e == null) {
+        return;
+      }
+      E prevE = putInternal(e, false);
+      if (prevE != null) {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("{}: {} was already in the right place.",
+              this, e.getKey());
+        }
+      } else {
+        // The put didn't actually add anything, it just moved something.
+        // So decrement numEntries to its previous value.
+        numEntries--;
+        adaptor.clear(addr);
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("{}: moved {} to the right place.",
+              this, e.getKey());
+        }
+      }
+    }
+  }
+
+  public E remove(K key) {
+    return getInternal(key, true);
+  }
+
+  public E get(K key) {
+    return getInternal(key, false);
+  }
+
+  private void expandTable(long newNumSlots) {
+    LOG.info("{}: Expanding table from {} slots to {}...",
+        this, numSlots, newNumSlots);
+    long newBase = mman.allocateZeroed(newNumSlots * slotSize);
+    long oldNumSlots = this.numSlots;
+    long oldExpansionThreshold = this.expansionThreshold;
+    long oldBase = this.base;
+    long oldNumEntries = this.numEntries;
+    try {
+      // Switch the hash table over to using the new memory region.
+      long entriesRemaining = oldNumEntries;
+      this.numSlots = newNumSlots;
+      this.expansionThreshold = (long)(newNumSlots * maxLoadFactor);
+      this.base = newBase;
+      this.numEntries = 0;
+
+      for (long slot = 0; slot < oldNumSlots; slot++) {
+        long addr = oldBase + (slot * slotSize);
+        E e = adaptor.load(addr);
+        if (e != null) {
+          E prevEntry = putInternal(e, false);
+          if (prevEntry != null) {
+            LOG.error("{}: Unexpected duplicate encountered when resizing " +
+                    "hash table: entry {} duplicates {}.", this,
+                e.getKey(), prevEntry.getKey()
+            );
+          }
+          entriesRemaining--;
+        }
+      }
+      if (entriesRemaining != 0) {
+        LOG.error("{}: Unexpectedly failed to locate {} entries that we " +
+                "thought we needed to move when resizing the hash table.",
+            this, entriesRemaining
+        );
+      }
+      LOG.info("{}: Finished expanding hash table from {} slots to {}.  " +
+              "Moved {} keys.  Freed old memory base 0x{}.  Using new memory " +
+              "base 0x{}.", this, oldNumSlots, numSlots, numEntries,
+              Long.toHexString(oldBase), Long.toHexString(newBase));
+    } catch (Throwable t) {
+      // In general we should never get here, since the functions used
+      // above should not throw exceptions.  But it's nice to be safe.
+      LOG.error("{}: expanding failed!  Restoring old memory region.", this, t);
+
+      // Switch back to using the old memory region.
+      this.numSlots = oldNumSlots;
+      this.expansionThreshold = oldExpansionThreshold;
+      this.base = oldBase;
+      this.numEntries = oldNumEntries;
+      mman.free(newBase);
+      throw new RuntimeException("Failed to expand " + this, t);
+    }
+    mman.free(oldBase);
+  }
+
+  /**
+   * Expand the hash table if it would need to expand to hold another key.
+   */
+  private void expandTableIfNeeded() {
+    if (numEntries > expansionThreshold) {
+      expandTable(numSlots * 2L);
+    }
+  }
+
+  /**
+   * Put the entry into the hash table if there is no entry in the hash table
+   * which is equivalent.
+   *
+   * @param putEntry        The entry to add if absent.
+   * @param overwrite       If true, we will overwrite the entry which is equal
+   *                          to putEntry (if there is one.)  If false, we will
+   *                          simply return that entry, but not overwrite it.
+   *
+   * @return                The previous entry in the hash table that was equal
+   *                          to the one we wanted to insert.  null if there
+   *                          was no such entry.
+   */
+  private E putInternal(E putEntry, boolean overwrite) {
+    long slot = getSlot(putEntry.getKey(), numSlots);
+    K putKey = putEntry.getKey();
+
+    while (true) {
+      long addr = this.base + (slot * slotSize);
+      E e = adaptor.load(addr);
+      if (e == null) {
+        adaptor.store(putEntry, addr);
+        numEntries++;
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("{}: stored {} into slot {} (addr 0x{})",
+              this, putKey, slot, Long.toHexString(addr));
+        }
+        return null;
+      }
+      K k = e.getKey();
+      if (k.equals(putKey)) {
+        if (!overwrite) {
+          if (LOG.isTraceEnabled()) {
+            LOG.trace("{}: could not store {} because we found an " +
+                "equivalent key {} in slot {} (addr 0x{})",
+                this, putKey, k, slot, Long.toHexString(addr));
+          }
+          return e;
+        }
+        // Overwrite the existing entry.
+        adaptor.store(putEntry, addr);
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("{}: stored {} by overwriting the equivalent key {} " +
+              "in slot {} (addr 0x{})", this, putKey, k, slot,
+              Long.toHexString(addr));
+        }
+        return e;
+      }
+      slot++;
+      if (slot == numSlots) {
+        slot = 0;
+      }
+    }
+  }
+
+  /**
+   * Put the entry into the hash table if there is no entry in the hash table
+   * which is equivalent.
+   *
+   * @param putEntry    The entry to add.
+   *
+   * @return            Null if the element was inserted.
+   *                      Otherwise, returns the previous element that compares
+   *                      to be the same as the one we unsuccessfully tried to
+   *                      add.
+   */
+  public E putIfAbsent(E putEntry) {
+    expandTableIfNeeded(); // call this first in case it fails (very unlikely)
+    return putInternal(putEntry, false);
+  }
+
+  /**
+   * Put an entry into the hash table, overwriting any existing element
+   * which is equivalent.
+   *
+   * @param putEntry    The entry to add.
+   *
+   * @return            null if there was no element in the table which was
+   *                      equivalent... the existing element which was
+   *                      equivalent, otherwise.  The existing element will
+   *                      be removed.
+   */
+  public E put(E putEntry) {
+    expandTableIfNeeded(); // call this first in case it fails (very unlikely)
+    return putInternal(putEntry, true);
+  }
+
+  /**
+   * Returns the current number of slots in the hash table.
+   */
+  public long numSlots() {
+    return numSlots;
+  }
+
+  /**
+   * Returns the size of the table.
+   */
+  public long size() {
+    return numEntries;
+  }
+
+  /**
+   * Returns true if the table is empty.
+   */
+  public boolean isEmpty() {
+    return numEntries == 0;
+  }
+
+  /**
+   * An iterator for the ProbingHashTable.<p/>
+   *
+   * Since ProbingHashTable has no internal synchronization, you are responsible
+   * for ensuring that there are no concurrent write operations on the hash
+   * table while an iterator function is being called.  The easiest way to do
+   * this is with external locking.<p/>
+   *
+   * You can still perform write operations after creating this iterator
+   * without invalidating the iterator object.  There are a few caveats:<p/>
+   * 1. Keys inserted after the iterator was created may or may not be
+   *    returned by the iterator.<p/>
+   * 2. If the hash table is enlarged due to adding more keys, this iterator
+   *    may return keys more than once, and return some keys not at all.<p/>
+   */
+  private class ProbingHashTableIterator implements Iterator<K> {
+    private long slotId = 0;
+    private K curKey;
+
+    private boolean refillCurKey() {
+      while (slotId < ProbingHashTable.this.numSlots) {
+        long addr = base + (slotId * slotSize);
+        E e = adaptor.load(addr);
+        slotId++;
+        if (e != null) {
+          curKey = e.getKey();
+          if (LOG.isTraceEnabled()) {
+            LOG.trace("{}: iterator found another key {} at slot {} " +
+                  "(address 0x{})", ProbingHashTable.this.toString(), curKey,
+                  (slotId - 1), Long.toHexString(addr));
+          }
+          return true;
+        }
+      }
+      LOG.trace("{}: no more keys to iterate over after reading all {} " +
+                "slots.", ProbingHashTable.this.toString(), slotId);
+      // Set slotId to Long.MAX_VALUE so that even if the hash table enlarges
+      // in the future, this iterator will continue to be at the end.
+      slotId = Long.MAX_VALUE;
+      return false;
+    }
+
+    @Override
+    public boolean hasNext() {
+      if (curKey != null) {
+        return true;
+      }
+      return refillCurKey();
+    }
+
+    @Override
+    public K next() {
+      if (curKey == null) {
+        if (!refillCurKey()) {
+          throw new IllegalStateException();
+        }
+      }
+      K key = curKey;
+      curKey = null;
+      return key;
+    }
+
+    @Override
+    public void remove() {
+      if (curKey == null) {
+        throw new IllegalStateException();
+      }
+      K key = curKey;
+      curKey = null;
+      if (ProbingHashTable.this.remove(key) == null) {
+        throw new NoSuchElementException("No such element as " +
+            key.toString());
+      }
+    }
+  }
+
+  public ProbingHashTableIterator iterator() {
+    return new ProbingHashTableIterator();
+  }
+
+  @Override
+  public String toString() {
+    return "ProbingHashTable(" + name + ")";
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/08761120/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/offheap/TestMemoryManager.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/offheap/TestMemoryManager.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/offheap/TestMemoryManager.java
new file mode 100644
index 0000000..6ffd37f
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/offheap/TestMemoryManager.java
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.util.offheap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestMemoryManager {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestMemoryManager.class);
+
+  @Test(timeout=60000)
+  public void testAllocateAndFreeOnHeap() throws Exception {
+    ByteArrayMemoryManager mman = new ByteArrayMemoryManager("test");
+    testAllocateAndFree(mman);
+    mman.close();
+  }
+
+  @Test(timeout=60000)
+  public void testAllocateAndFreeOffHeap() throws Exception {
+    Assume.assumeTrue(NativeMemoryManager.isAvailable());
+    NativeMemoryManager mman = new NativeMemoryManager("test");
+    testAllocateAndFree(mman);
+    mman.close();
+  }
+
+  private void testAllocateAndFree(MemoryManager mman) throws Exception {
+    long addr = mman.allocate(100);
+    Assert.assertTrue("Expected addr to be non-zero.", addr != 0);
+    mman.free(addr);
+  }
+
+  @Test(timeout=60000)
+  public void testGetAndSetOnHeap() throws Exception {
+    ByteArrayMemoryManager mman = new ByteArrayMemoryManager("test");
+    testGetAndSet(mman);
+    mman.close();
+  }
+
+  @Test(timeout=60000)
+  public void testGetAndSetOffHeap() throws Exception {
+    Assume.assumeTrue(NativeMemoryManager.isAvailable());
+    NativeMemoryManager mman = new NativeMemoryManager("test");
+    testGetAndSet(mman);
+    mman.close();
+  }
+
+  private void testGetAndSet(MemoryManager mman) throws Exception {
+    LOG.info("testingGetAndSet with " + mman.getClass().getCanonicalName());
+    long byteAddr = mman.allocateZeroed(1);
+    Assert.assertTrue("Expected addr to be non-zero.", byteAddr != 0);
+    byte b = mman.getByte(byteAddr);
+    Assert.assertEquals((byte)0, b);
+    mman.putByte(byteAddr, (byte) 42);
+    b = mman.getByte(byteAddr);
+    Assert.assertEquals((byte)42, b);
+
+    long intAddr = mman.allocateZeroed(4);
+    Assert.assertTrue("Expected addr to be non-zero.", intAddr != 0);
+    int i = mman.getInt(intAddr);
+    Assert.assertEquals(0, i);
+    mman.putInt(intAddr, 0xfea01234);
+    i = mman.getInt(intAddr);
+    Assert.assertEquals(0xfea01234, i);
+
+    long shortAddr = mman.allocateZeroed(2);
+    Assert.assertTrue("Expected addr to be non-zero.", shortAddr != 0);
+    short s = mman.getShort(shortAddr);
+    Assert.assertEquals(0, s);
+    mman.putShort(shortAddr, (short) 0xeecc);
+    s = mman.getShort(shortAddr);
+    Assert.assertEquals((short)0xeecc, s);
+
+    long longAddr = mman.allocateZeroed(8);
+    Assert.assertTrue("Expected addr to be non-zero.", longAddr != 0);
+    long l = mman.getLong(longAddr);
+    Assert.assertEquals(0, l);
+    long testVal = 0x3ea0123400112233L;
+    LOG.info("longAddr = " + longAddr + ", testVal = "  + testVal);
+    mman.putLong(longAddr, testVal);
+    l = mman.getLong(longAddr);
+    LOG.info("got back " + l + " from " + longAddr);
+    Assert.assertEquals(testVal, l);
+
+    mman.free(byteAddr);
+    mman.free(intAddr);
+    mman.free(shortAddr);
+    mman.free(longAddr);
+  }
+
+  @Test(timeout=60000)
+  public void testCatchInvalidPuts() throws Exception {
+    ByteArrayMemoryManager mman = new ByteArrayMemoryManager("test");
+    long addr = mman.allocate(1);
+    mman.putByte(addr, (byte)1); // should succeed
+    try {
+      mman.putInt(addr, 0xdeadbeef);
+      Assert.fail("expected to catch invalid put");
+    } catch (RuntimeException e) {
+    }
+    try {
+      mman.putByte(addr + 1, (byte) 1);
+      Assert.fail("expected to catch invalid put");
+    } catch (RuntimeException e) {
+    }
+    try {
+      mman.putLong(addr, 11111111111L);
+      Assert.fail("expected to catch invalid put");
+    } catch (RuntimeException e) {
+    }
+    mman.free(addr);
+    try {
+      mman.putByte(addr, (byte)1);
+      Assert.fail("expected to catch invalid put");
+    } catch (RuntimeException e) {
+    }
+    try {
+      mman.putShort(addr, (short) 101);
+      Assert.fail("expected to catch invalid put");
+    } catch (RuntimeException e) {
+    }
+    try {
+      mman.putInt(addr + 1, 0xfaceface);
+      Assert.fail("expected to catch invalid put");
+    } catch (RuntimeException e) {
+    }
+    try {
+      mman.putLong(addr, 0xf00L);
+      Assert.fail("expected to catch invalid put");
+    } catch (RuntimeException e) {
+    }
+    mman.close();
+  }
+
+  private void testMemoryManagerCreate(
+      String className, String createdClassName) throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(CommonConfigurationKeys.HADOOP_MEMORY_MANAGER_KEY, className);
+    MemoryManager mman = MemoryManager.Factory.create("test", conf);
+    Assert.assertNotNull(mman);
+    Assert.assertEquals(createdClassName, mman.getClass().getCanonicalName());
+    mman.close();
+  }
+
+  @Test(timeout=60000)
+  public void testByteBufferMemoryManagerCreate() throws Exception {
+    testMemoryManagerCreate(
+        "org.apache.hadoop.util.offheap.ByteArrayMemoryManager",
+        "org.apache.hadoop.util.offheap.ByteArrayMemoryManager");
+  }
+
+  @Test(timeout=60000)
+  public void testNativeMemoryManagerCreate() throws Exception {
+    Assume.assumeTrue(NativeMemoryManager.isAvailable());
+    testMemoryManagerCreate(
+        "org.apache.hadoop.util.offheap.NativeMemoryManager",
+        "org.apache.hadoop.util.offheap.NativeMemoryManager");
+  }
+
+  @Test(timeout=60000)
+  public void testDefaultMemoryManagerCreate() throws Exception {
+    testMemoryManagerCreate(
+        "org.apache.hadoop.util.offheap.NonExistentMemoryManager",
+        "org.apache.hadoop.util.offheap.ByteArrayMemoryManager");
+  }
+
+  @Test(timeout=60000)
+  public void testByteBufferMemoryDirtyClose() throws Exception {
+    ByteArrayMemoryManager mman = new ByteArrayMemoryManager("test");
+    long addr = mman.allocate(1);
+    try {
+      mman.close();
+      Assert.fail("expected close to fail since we did not free all " +
+          "allocations first.");
+    } catch (RuntimeException e) {
+      GenericTestUtils.assertExceptionContains("There are still unfreed " +
+          "buffers", e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/08761120/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/offheap/TestProbingHashTable.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/offheap/TestProbingHashTable.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/offheap/TestProbingHashTable.java
new file mode 100644
index 0000000..bb68f35
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/offheap/TestProbingHashTable.java
@@ -0,0 +1,392 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.util.offheap;
+
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hashing;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Iterator;
+
+public class TestProbingHashTable {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestProbingHashTable.class);
+
+  @Before
+  public void before() {
+    GenericTestUtils.setLogLevel(ProbingHashTable.LOG, Level.ALL);
+    GenericTestUtils.setLogLevel(NativeMemoryManager.LOG, Level.ALL);
+    GenericTestUtils.setLogLevel(ByteArrayMemoryManager.LOG, Level.ALL);
+  }
+
+  private static class TestBlockId implements ProbingHashTable.Key {
+    private static final HashFunction hashFunction = Hashing.goodFastHash(64);
+
+    private final long id;
+
+    TestBlockId(long id) {
+      this.id = id;
+    }
+
+    @Override
+    public long longHash() {
+      return hashFunction.newHasher().putLong(id).hash().asLong();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (o.getClass() != this.getClass()) {
+        return false;
+      }
+      return id == ((TestBlockId)o).id;
+    }
+
+    @Override
+    public String toString() {
+      return "TestBlockId(0x" + Long.toHexString(id) + ")";
+    }
+
+    // Just for use with java.util.HashMap
+    @Override
+    public int hashCode() {
+      return (int)(id & 0xfffffff) ^ (int)((id >> 32) & 0xffffffff);
+    }
+  }
+
+  private static class TestBlockInfo
+        implements  ProbingHashTable.Entry<TestBlockId>, Closeable {
+    /**
+     * The memory manager to use.
+     */
+    private MemoryManager mman;
+
+    /**
+     * The address of the block reference.
+     */
+    private long addr;
+
+    private static final long BLOCK_ID_OFF = 0;
+
+    private static final long TOTAL_LEN = 8;
+
+    static TestBlockInfo allocZeroed(MemoryManager mman) {
+      long addr = mman.allocateZeroed(TOTAL_LEN);
+      return new TestBlockInfo(mman, addr);
+    }
+
+    TestBlockInfo(MemoryManager mman, long addr) {
+      this.mman = mman;
+      this.addr = addr;
+    }
+
+    public long getBlockId() {
+      return mman.getLong(addr + BLOCK_ID_OFF);
+    }
+
+    public void setBlockId(long blockId) {
+      mman.putLong(addr + BLOCK_ID_OFF, blockId);
+    }
+
+    public void close() throws IOException {
+      mman.free(addr);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (o.getClass() != TestBlockInfo.class) {
+        return false;
+      }
+      TestBlockInfo other = (TestBlockInfo)o;
+      return (other.getBlockId() == getBlockId());
+    }
+
+    @Override
+    public TestBlockId getKey() {
+      return new TestBlockId(getBlockId());
+    }
+  }
+
+  private static class TestBlockInfoAdaptor
+        implements ProbingHashTable.Adaptor<TestBlockInfo> {
+    private final MemoryManager mman;
+
+    TestBlockInfoAdaptor(MemoryManager mman) {
+      this.mman = mman;
+    }
+
+    @Override
+    public int getSlotSize() {
+      return 8;
+    }
+
+    @Override
+    public TestBlockInfo load(long addr) {
+      long infoAddr = mman.getLong(addr);
+      if (infoAddr == 0) {
+        return null;
+      }
+      return new TestBlockInfo(mman, infoAddr);
+    }
+
+    @Override
+    public void store(TestBlockInfo info, long addr) {
+      mman.putLong(addr, info.addr);
+    }
+
+    @Override
+    public void clear(long addr) {
+      mman.putLong(addr, 0L);
+    }
+  }
+
+  private void testAllocateAndFree(MemoryManager mman) throws Exception {
+    TestBlockInfoAdaptor adaptor = new TestBlockInfoAdaptor(mman);
+    ProbingHashTable<TestBlockId, TestBlockInfo> htable =
+        new ProbingHashTable<TestBlockId, TestBlockInfo>(
+            "testAllocateAndFreeTable", mman, adaptor, 100, 0.5f);
+    // should have been rounded up to 256
+    Assert.assertEquals(256, htable.numSlots());
+    htable.close();
+  }
+
+  @Test(timeout=60000)
+  public void testAllocateAndFreeOnHeap() throws Exception {
+    ByteArrayMemoryManager mman = new ByteArrayMemoryManager("test");
+    testAllocateAndFree(mman);
+    mman.close();
+  }
+
+  @Test(timeout=60000)
+  public void testAllocateAndFreeOffHeap() throws Exception {
+    Assume.assumeTrue(NativeMemoryManager.isAvailable());
+    NativeMemoryManager mman = new NativeMemoryManager("test");
+    testAllocateAndFree(mman);
+    mman.close();
+  }
+
+  private static TestBlockInfo[] createBlockInfos(MemoryManager mman,
+                                    int initialBlockId, int numBlocks) {
+    TestBlockInfo infos[] = new TestBlockInfo[numBlocks];
+    boolean success = false;
+    try {
+      for (int i = 0; i < numBlocks; i++) {
+        infos[i] = TestBlockInfo.allocZeroed(mman);
+        infos[i].setBlockId(initialBlockId + i);
+        LOG.info("allocated infos[{}] with id {}", i, infos[i].getBlockId());
+      }
+      success = true;
+      return infos;
+    } finally {
+      if (!success) {
+        freeBlockInfos(infos);
+      }
+    }
+  }
+
+  private static void freeBlockInfos(TestBlockInfo[] infos) {
+    if (infos != null) {
+      for (int i = 0; i < infos.length; i++) {
+        if (infos[i] != null) {
+          IOUtils.cleanup(null, infos[i]);
+        }
+      }
+    }
+  }
+
+  private void testAddRemove(MemoryManager mman) throws Exception {
+    TestBlockInfoAdaptor adaptor = new TestBlockInfoAdaptor(mman);
+    ProbingHashTable<TestBlockId, TestBlockInfo> htable =
+        new ProbingHashTable<TestBlockId, TestBlockInfo>(
+            "testAddRemoveTable", mman, adaptor, 10, 0.5f);
+    TestBlockInfo infos[] = null;
+    Assert.assertEquals(32, htable.numSlots());
+    Assert.assertTrue(htable.isEmpty());
+    Assert.assertEquals(0, htable.size());
+    infos = createBlockInfos(mman, 1, 6);
+    for (int i = 0; i < infos.length; i++) {
+      LOG.info("Putting {} into {}", infos[i].getKey(), htable);
+      TestBlockInfo prev = htable.putIfAbsent(infos[i]);
+      Assert.assertEquals(null, prev);
+    }
+    Assert.assertFalse(htable.isEmpty());
+    Assert.assertEquals(infos.length, htable.size());
+
+    // Test that we can iterate over all elements in the hash table.
+    Iterator<TestBlockId> iter = htable.iterator();
+    Assert.assertNotNull(iter);
+    HashSet<TestBlockId> contents = new HashSet<TestBlockId>();
+    for (TestBlockInfo info : infos) {
+      contents.add(info.getKey());
+    }
+    for (int i = 0; i < infos.length; i++) {
+      Assert.assertTrue(iter.hasNext());
+      TestBlockId blockId = iter.next();
+      Assert.assertTrue("Iterator returned " + blockId + ", which was " +
+          "not inserted into the HashTable.", contents.remove(blockId));
+    }
+    Assert.assertFalse(iter.hasNext());
+    Assert.assertEquals("Did not find " + contents.size() + " entries " +
+        "from the hash table during iteration.", 0, contents.size());
+
+    for (int i = 0; i < infos.length; i++) {
+      LOG.info("Removing {} from {}", infos[i].getKey(), htable);
+      TestBlockInfo prev = htable.remove(infos[i].getKey());
+      Assert.assertNotNull("unable to remove " + infos[i].getKey() +
+          " from the ProbingHashTable.", prev);
+    }
+    Assert.assertTrue(htable.isEmpty());
+    freeBlockInfos(infos);
+    htable.close();
+  }
+
+  @Test(timeout=60000)
+  public void testAddRemoveOnHeap() throws Exception {
+    ByteArrayMemoryManager mman = new ByteArrayMemoryManager("test");
+    testAddRemove(mman);
+    mman.close();
+  }
+
+  @Test(timeout=60000)
+  public void testAddRemoveOffHeap() throws Exception {
+    Assume.assumeTrue(NativeMemoryManager.isAvailable());
+    NativeMemoryManager mman = new NativeMemoryManager("test");
+    testAddRemove(mman);
+    mman.close();
+  }
+
+  private void testEnlargeHashTable(MemoryManager mman) throws Exception {
+    TestBlockInfoAdaptor adaptor = new TestBlockInfoAdaptor(mman);
+    ProbingHashTable<TestBlockId, TestBlockInfo> htable =
+        new ProbingHashTable<TestBlockId, TestBlockInfo>(
+            "testEnlargeHashTable", mman, adaptor, 4, 0.5f);
+    TestBlockInfo infos[] = null;
+    Assert.assertEquals(8, htable.numSlots());
+    Assert.assertTrue(htable.isEmpty());
+    Assert.assertEquals(0, htable.size());
+    infos = createBlockInfos(mman, 1, 33);
+    for (int i = 0; i < 4; i++) {
+      LOG.info("Putting {} into {}", infos[i].getKey(), htable);
+      TestBlockInfo prev = htable.putIfAbsent(infos[i]);
+      Assert.assertEquals(null, prev);
+    }
+    Assert.assertEquals(8, htable.numSlots());
+    Assert.assertFalse(htable.isEmpty());
+    Assert.assertEquals(4, htable.size());
+    for (int i = 4; i < 8; i++) {
+      LOG.info("Putting {} into {}", infos[i].getKey(), htable);
+      TestBlockInfo prev = htable.putIfAbsent(infos[i]);
+      Assert.assertEquals(null, prev);
+    }
+    Assert.assertEquals(16, htable.numSlots());
+    Assert.assertFalse(htable.isEmpty());
+    Assert.assertEquals(8, htable.size());
+
+    for (int i = 8; i < 16; i++) {
+      LOG.info("Putting {} into {}", infos[i].getKey(), htable);
+      TestBlockInfo prev = htable.putIfAbsent(infos[i]);
+      Assert.assertEquals(null, prev);
+    }
+    Assert.assertEquals(32, htable.numSlots());
+    Assert.assertFalse(htable.isEmpty());
+    Assert.assertEquals(16, htable.size());
+
+    for (int i = 16; i < infos.length; i++) {
+      LOG.info("Putting {} into {}", infos[i].getKey(), htable);
+      TestBlockInfo prev = htable.putIfAbsent(infos[i]);
+      Assert.assertEquals(null, prev);
+    }
+    Assert.assertEquals(64, htable.numSlots());
+    Assert.assertFalse(htable.isEmpty());
+    Assert.assertEquals(33, htable.size());
+
+    // Delete every other element
+    for (int i = 0; i < infos.length; i+=2) {
+      LOG.info("Removing {} from {}", infos[i].getKey(), htable);
+      TestBlockInfo prev = htable.remove(infos[i].getKey());
+      Assert.assertNotNull("unable to remove " + infos[i].getKey() +
+          " from the ProbingHashTable.", prev);
+    }
+    Assert.assertEquals(64, htable.numSlots());
+    Assert.assertFalse(htable.isEmpty());
+    Assert.assertEquals(16, htable.size());
+
+    // Test that we can iterate over all remaining elements in the hash set.
+    Iterator<TestBlockId> iter = htable.iterator();
+    Assert.assertNotNull(iter);
+    HashSet<TestBlockId> contents = new HashSet<TestBlockId>();
+    for (int i = 1; i < infos.length; i+=2) {
+      contents.add(infos[i].getKey());
+    }
+    for (int i = 1; i < infos.length; i+=2) {
+      Assert.assertTrue(iter.hasNext());
+      TestBlockId blockId = iter.next();
+      Assert.assertTrue("Iterator returned " + blockId + ", which was " +
+          "not inserted into the HashTable.", contents.remove(blockId));
+    }
+    Assert.assertFalse(iter.hasNext());
+    Assert.assertEquals("Did not find " + contents.size() + " entries " +
+        "from the hash table during iteration.", 0, contents.size());
+
+    // Delete remaining elements
+    for (int i = 1; i < infos.length; i+=2) {
+      LOG.info("Removing {} from {}", infos[i].getKey(), htable);
+      TestBlockInfo prev = htable.remove(infos[i].getKey());
+      Assert.assertNotNull("unable to remove " + infos[i].getKey() +
+          " from the ProbingHashTable.", prev);
+    }
+    Assert.assertEquals(64, htable.numSlots());
+    Assert.assertTrue(htable.isEmpty());
+    Assert.assertEquals(0, htable.size());
+
+    iter = htable.iterator();
+    Assert.assertNotNull(iter);
+    Assert.assertFalse(iter.hasNext());
+
+    freeBlockInfos(infos);
+    htable.close();
+  }
+
+  @Test(timeout=60000)
+  public void testEnlargeHashTableOnHeap() throws Exception {
+    ByteArrayMemoryManager mman = new ByteArrayMemoryManager("test");
+    testEnlargeHashTable(mman);
+    mman.close();
+  }
+
+  @Test(timeout=60000)
+  public void testEnlargeHashTableOffHeap() throws Exception {
+    Assume.assumeTrue(NativeMemoryManager.isAvailable());
+    NativeMemoryManager mman = new NativeMemoryManager("test");
+    testEnlargeHashTable(mman);
+    mman.close();
+  }
+}