You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by to...@apache.org on 2014/07/24 08:22:03 UTC
svn commit: r1613006 - in
/hadoop/common/trunk/hadoop-common-project/hadoop-common: CHANGES.txt
src/main/java/org/apache/hadoop/util/DirectBufferPool.java
src/test/java/org/apache/hadoop/util/TestDirectBufferPool.java
Author: todd
Date: Thu Jul 24 06:22:02 2014
New Revision: 1613006
URL: http://svn.apache.org/r1613006
Log:
HADOOP-10882. Move DirectBufferPool into common util. Contributed by Todd Lipcon.
Added:
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DirectBufferPool.java
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestDirectBufferPool.java
Modified:
hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt
Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt?rev=1613006&r1=1613005&r2=1613006&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt Thu Jul 24 06:22:02 2014
@@ -455,6 +455,8 @@ Release 2.6.0 - UNRELEASED
HADOOP-10887. Add XAttrs to ViewFs and make XAttrs + ViewFileSystem
internal dir behavior consistent. (Stephen Chu via wang)
+ HADOOP-10882. Move DirectBufferPool into common util. (todd)
+
OPTIMIZATIONS
BUG FIXES
Added: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DirectBufferPool.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DirectBufferPool.java?rev=1613006&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DirectBufferPool.java (added)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DirectBufferPool.java Thu Jul 24 06:22:02 2014
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.util;
+
+import java.lang.ref.WeakReference;
+import java.nio.ByteBuffer;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * A simple class for pooling direct ByteBuffers. This is necessary
+ * because Direct Byte Buffers do not take up much space on the heap,
+ * and hence will not trigger GCs on their own. However, they do take
+ * native memory, and thus can cause high memory usage if not pooled.
+ * The pooled instances are referred to only via weak references, allowing
+ * them to be collected when a GC does run.
+ *
+ * This class only does effective pooling when many buffers will be
+ * allocated at the same size. There is no attempt to reuse larger
+ * buffers to satisfy smaller allocations.
+ */
+@InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"})
+@InterfaceStability.Evolving
+public class DirectBufferPool {
+
+ // Essentially implement a multimap with weak values.
+ final ConcurrentMap<Integer, Queue<WeakReference<ByteBuffer>>> buffersBySize =
+ new ConcurrentHashMap<Integer, Queue<WeakReference<ByteBuffer>>>();
+
+ /**
+ * Allocate a direct buffer of the specified size, in bytes.
+ * If a pooled buffer is available, returns that. Otherwise
+ * allocates a new one.
+ */
+ public ByteBuffer getBuffer(int size) {
+ Queue<WeakReference<ByteBuffer>> list = buffersBySize.get(size);
+ if (list == null) {
+ // no available buffers for this size
+ return ByteBuffer.allocateDirect(size);
+ }
+
+ WeakReference<ByteBuffer> ref;
+ while ((ref = list.poll()) != null) {
+ ByteBuffer b = ref.get();
+ if (b != null) {
+ return b;
+ }
+ }
+
+ return ByteBuffer.allocateDirect(size);
+ }
+
+ /**
+ * Return a buffer into the pool. After being returned,
+ * the buffer may be recycled, so the user must not
+ * continue to use it in any way.
+ * @param buf the buffer to return
+ */
+ public void returnBuffer(ByteBuffer buf) {
+ buf.clear(); // reset mark, limit, etc
+ int size = buf.capacity();
+ Queue<WeakReference<ByteBuffer>> list = buffersBySize.get(size);
+ if (list == null) {
+ list = new ConcurrentLinkedQueue<WeakReference<ByteBuffer>>();
+ Queue<WeakReference<ByteBuffer>> prev = buffersBySize.putIfAbsent(size, list);
+ // someone else put a queue in the map before we did
+ if (prev != null) {
+ list = prev;
+ }
+ }
+ list.add(new WeakReference<ByteBuffer>(buf));
+ }
+
+ /**
+ * Return the number of available buffers of a given size.
+ * This is used only for tests.
+ */
+ @VisibleForTesting
+ int countBuffersOfSize(int size) {
+ Queue<WeakReference<ByteBuffer>> list = buffersBySize.get(size);
+ if (list == null) {
+ return 0;
+ }
+
+ return list.size();
+ }
+}
\ No newline at end of file
Added: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestDirectBufferPool.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestDirectBufferPool.java?rev=1613006&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestDirectBufferPool.java (added)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestDirectBufferPool.java Thu Jul 24 06:22:02 2014
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.util;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertSame;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public class TestDirectBufferPool {
+ final org.apache.hadoop.util.DirectBufferPool pool = new org.apache.hadoop.util.DirectBufferPool();
+
+ @Test
+ public void testBasics() {
+ ByteBuffer a = pool.getBuffer(100);
+ assertEquals(100, a.capacity());
+ assertEquals(100, a.remaining());
+ pool.returnBuffer(a);
+
+ // Getting a new buffer should return the same one
+ ByteBuffer b = pool.getBuffer(100);
+ assertSame(a, b);
+
+ // Getting a new buffer before returning "B" should
+ // not return the same one
+ ByteBuffer c = pool.getBuffer(100);
+ assertNotSame(b, c);
+ pool.returnBuffer(b);
+ pool.returnBuffer(c);
+ }
+
+ @Test
+ public void testBuffersAreReset() {
+ ByteBuffer a = pool.getBuffer(100);
+ a.putInt(0xdeadbeef);
+ assertEquals(96, a.remaining());
+ pool.returnBuffer(a);
+
+ // Even though we return the same buffer,
+ // its position should be reset to 0
+ ByteBuffer b = pool.getBuffer(100);
+ assertSame(a, b);
+ assertEquals(100, a.remaining());
+ pool.returnBuffer(b);
+ }
+
+ @Test
+ public void testWeakRefClearing() {
+ // Allocate and return 10 buffers.
+ List<ByteBuffer> bufs = Lists.newLinkedList();
+ for (int i = 0; i < 10; i++) {
+ ByteBuffer buf = pool.getBuffer(100);
+ bufs.add(buf);
+ }
+
+ for (ByteBuffer buf : bufs) {
+ pool.returnBuffer(buf);
+ }
+
+ assertEquals(10, pool.countBuffersOfSize(100));
+
+ // Clear out any references to the buffers, and force
+ // GC. Weak refs should get cleared.
+ bufs.clear();
+ bufs = null;
+ for (int i = 0; i < 3; i++) {
+ System.gc();
+ }
+
+ ByteBuffer buf = pool.getBuffer(100);
+ // the act of getting a buffer should clear all the nulled
+ // references from the pool.
+ assertEquals(0, pool.countBuffersOfSize(100));
+ pool.returnBuffer(buf);
+ }
+}
\ No newline at end of file