You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by to...@apache.org on 2014/07/24 08:22:03 UTC

svn commit: r1613006 - in /hadoop/common/trunk/hadoop-common-project/hadoop-common: CHANGES.txt src/main/java/org/apache/hadoop/util/DirectBufferPool.java src/test/java/org/apache/hadoop/util/TestDirectBufferPool.java

Author: todd
Date: Thu Jul 24 06:22:02 2014
New Revision: 1613006

URL: http://svn.apache.org/r1613006
Log:
HADOOP-10882. Move DirectBufferPool into common util. Contributed by Todd Lipcon.

Added:
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DirectBufferPool.java
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestDirectBufferPool.java
Modified:
    hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt?rev=1613006&r1=1613005&r2=1613006&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt Thu Jul 24 06:22:02 2014
@@ -455,6 +455,8 @@ Release 2.6.0 - UNRELEASED
     HADOOP-10887. Add XAttrs to ViewFs and make XAttrs + ViewFileSystem
     internal dir behavior consistent. (Stephen Chu via wang)
 
+    HADOOP-10882. Move DirectBufferPool into common util. (todd)
+
   OPTIMIZATIONS
 
   BUG FIXES

Added: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DirectBufferPool.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DirectBufferPool.java?rev=1613006&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DirectBufferPool.java (added)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DirectBufferPool.java Thu Jul 24 06:22:02 2014
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.util;
+
+import java.lang.ref.WeakReference;
+import java.nio.ByteBuffer;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * A simple class for pooling direct ByteBuffers. This is necessary
+ * because Direct Byte Buffers do not take up much space on the heap,
+ * and hence will not trigger GCs on their own. However, they do take
+ * native memory, and thus can cause high memory usage if not pooled.
+ * The pooled instances are referred to only via weak references, allowing
+ * them to be collected when a GC does run.
+ *
+ * This class only does effective pooling when many buffers will be
+ * allocated at the same size. There is no attempt to reuse larger
+ * buffers to satisfy smaller allocations.
+ */
+@InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"})
+@InterfaceStability.Evolving
+public class DirectBufferPool {
+
+  // Essentially implement a multimap with weak values.
+  final ConcurrentMap<Integer, Queue<WeakReference<ByteBuffer>>> buffersBySize =
+    new ConcurrentHashMap<Integer, Queue<WeakReference<ByteBuffer>>>();
+ 
+  /**
+   * Allocate a direct buffer of the specified size, in bytes.
+   * If a pooled buffer is available, returns that. Otherwise
+   * allocates a new one.
+   */
+  public ByteBuffer getBuffer(int size) {
+    Queue<WeakReference<ByteBuffer>> list = buffersBySize.get(size);
+    if (list == null) {
+      // no available buffers for this size
+      return ByteBuffer.allocateDirect(size);
+    }
+    
+    WeakReference<ByteBuffer> ref;
+    while ((ref = list.poll()) != null) {
+      ByteBuffer b = ref.get();
+      if (b != null) {
+        return b;
+      }
+    }
+
+    return ByteBuffer.allocateDirect(size);
+  }
+  
+  /**
+   * Return a buffer into the pool. After being returned,
+   * the buffer may be recycled, so the user must not
+   * continue to use it in any way.
+   * @param buf the buffer to return
+   */
+  public void returnBuffer(ByteBuffer buf) {
+    buf.clear(); // reset mark, limit, etc
+    int size = buf.capacity();
+    Queue<WeakReference<ByteBuffer>> list = buffersBySize.get(size);
+    if (list == null) {
+      list = new ConcurrentLinkedQueue<WeakReference<ByteBuffer>>();
+      Queue<WeakReference<ByteBuffer>> prev = buffersBySize.putIfAbsent(size, list);
+      // someone else put a queue in the map before we did
+      if (prev != null) {
+        list = prev;
+      }
+    }
+    list.add(new WeakReference<ByteBuffer>(buf));
+  }
+  
+  /**
+   * Return the number of available buffers of a given size.
+   * This is used only for tests.
+   */
+  @VisibleForTesting
+  int countBuffersOfSize(int size) {
+    Queue<WeakReference<ByteBuffer>> list = buffersBySize.get(size);
+    if (list == null) {
+      return 0;
+    }
+    
+    return list.size();
+  }
+}
\ No newline at end of file

Added: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestDirectBufferPool.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestDirectBufferPool.java?rev=1613006&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestDirectBufferPool.java (added)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestDirectBufferPool.java Thu Jul 24 06:22:02 2014
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.util;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertSame;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public class TestDirectBufferPool {
+  final org.apache.hadoop.util.DirectBufferPool pool = new org.apache.hadoop.util.DirectBufferPool();
+  
+  @Test
+  public void testBasics() {
+    ByteBuffer a = pool.getBuffer(100);
+    assertEquals(100, a.capacity());
+    assertEquals(100, a.remaining());
+    pool.returnBuffer(a);
+    
+    // Getting a new buffer should return the same one
+    ByteBuffer b = pool.getBuffer(100);
+    assertSame(a, b);
+    
+    // Getting a new buffer before returning "B" should
+    // not return the same one
+    ByteBuffer c = pool.getBuffer(100);
+    assertNotSame(b, c);
+    pool.returnBuffer(b);
+    pool.returnBuffer(c);
+  }
+  
+  @Test
+  public void testBuffersAreReset() {
+    ByteBuffer a = pool.getBuffer(100);
+    a.putInt(0xdeadbeef);
+    assertEquals(96, a.remaining());
+    pool.returnBuffer(a);
+
+    // Even though we return the same buffer,
+    // its position should be reset to 0
+    ByteBuffer b = pool.getBuffer(100);
+    assertSame(a, b);
+    assertEquals(100, a.remaining());
+    pool.returnBuffer(b);
+  }
+  
+  @Test
+  public void testWeakRefClearing() {
+    // Allocate and return 10 buffers.
+    List<ByteBuffer> bufs = Lists.newLinkedList();
+    for (int i = 0; i < 10; i++) {
+      ByteBuffer buf = pool.getBuffer(100);
+      bufs.add(buf);
+    }
+    
+    for (ByteBuffer buf : bufs) {
+      pool.returnBuffer(buf);      
+    }
+
+    assertEquals(10, pool.countBuffersOfSize(100));
+
+    // Clear out any references to the buffers, and force
+    // GC. Weak refs should get cleared.
+    bufs.clear();
+    bufs = null;
+    for (int i = 0; i < 3; i++) {
+      System.gc();
+    }
+
+    ByteBuffer buf = pool.getBuffer(100);
+    // the act of getting a buffer should clear all the nulled
+    // references from the pool.
+    assertEquals(0, pool.countBuffersOfSize(100));
+    pool.returnBuffer(buf);
+  }
+}
\ No newline at end of file