You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-issues@hadoop.apache.org by GitBox <gi...@apache.org> on 2022/05/04 21:54:09 UTC

[GitHub] [hadoop] mukund-thakur opened a new pull request, #4263: HADOOP-18105 Implement buffer pooling with weak references

mukund-thakur opened a new pull request, #4263:
URL: https://github.com/apache/hadoop/pull/4263

   ### Description of PR
   part of HADOOP-18103.
   Required for vectored IO feature. None of current buffer pool
   implementation is complete. ElasticByteBufferPool doesn't use
   weak refrences and could lead to memory leak errors and
   DirectBufferPool doesn't support caller prefrences of direct
   and heap buffers and has only fixed length buffer implementation.
   
   ### How was this patch tested?
   Added new unit tests and tested through vectored read api integration test.
   
   
   ### For code changes:
   
   - [ ] Does the title or this PR starts with the corresponding JIRA issue id (e.g. 'HADOOP-17799. Your PR title ...')?
   - [ ] Object storage: have the integration tests been executed and the endpoint declared according to the connector-specific documentation?
   - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)?
   - [ ] If applicable, have you updated the `LICENSE`, `LICENSE-binary`, `NOTICE-binary` files?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-issues-help@hadoop.apache.org


[GitHub] [hadoop] mukund-thakur commented on a diff in pull request #4263: HADOOP-18105 Implement buffer pooling with weak references

Posted by GitBox <gi...@apache.org>.
mukund-thakur commented on code in PR #4263:
URL: https://github.com/apache/hadoop/pull/4263#discussion_r882014318


##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/WeakReferencedElasticByteBufferPool.java:
##########
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io;
+
+import java.lang.ref.WeakReference;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+
+/**
+ * Buffer pool implementation which uses weak references to store
+ * buffers in the pool, such that they are garbage collected when
+ * there are no references to the buffer during a gc run. This is
+ * important as direct buffers don't get garbage collected automatically
+ * during a gc run as they are not stored on heap memory.
+ * Also the buffers are stored in a tree map which helps in returning
+ * smallest buffer whose size is just greater than requested length.
+ * This is a thread safe implementation.
+ */
+public final class WeakReferencedElasticByteBufferPool extends ElasticByteBufferPool {
+
+  private final TreeMap<Key, WeakReference<ByteBuffer>> directBuffers =
+          new TreeMap<>();
+
+  private final TreeMap<Key, WeakReference<ByteBuffer>> heapBuffers =
+          new TreeMap<>();
+
+  private TreeMap<Key, WeakReference<ByteBuffer>> getBufferTree(boolean isDirect) {
+    return isDirect
+            ? directBuffers
+            : heapBuffers;
+  }
+
+  /**
+   * {@inheritDoc}
+   *
+   * @param direct whether we want a direct byte buffer or a heap one.
+   * @param length length of requested buffer.
+   * @return returns equal or next greater than capacity buffer from
+   * pool if already available and not garbage collected else creates
+   * a new buffer and return it.
+   */
+  @Override
+  public synchronized ByteBuffer getBuffer(boolean direct, int length) {
+    TreeMap<Key, WeakReference<ByteBuffer>> buffersTree = getBufferTree(direct);
+
+    // Scan the entire tree and remove all weak null references.
+    buffersTree.entrySet().removeIf(next -> next.getValue().get() == null);
+
+    Map.Entry<Key, WeakReference<ByteBuffer>> entry =
+            buffersTree.ceilingEntry(new Key(length, 0));
+    // If there is no buffer present in the pool with desired size.
+    if (entry == null) {
+      return direct ? ByteBuffer.allocateDirect(length) :
+                      ByteBuffer.allocate(length);
+    }
+    // buffer is available in the pool and not garbage collected.
+    WeakReference<ByteBuffer> bufferInPool = entry.getValue();
+    buffersTree.remove(entry.getKey());
+    ByteBuffer buffer = bufferInPool.get();
+    if (buffer != null) {
+      return buffer;
+    }
+    // buffer was in pool but already got garbage collected.
+    return direct ? ByteBuffer.allocateDirect(length) :
+            ByteBuffer.allocate(length);
+  }
+
+  /**
+   * Return buffer to the pool.
+   * @param buffer buffer to be returned.
+   */
+  @Override
+  public synchronized void putBuffer(ByteBuffer buffer) {
+    buffer.clear();
+    TreeMap<Key, WeakReference<ByteBuffer>> buffersTree = getBufferTree(buffer.isDirect());
+    // Buffers are indexed by (capacity, time).
+    // If our key is not unique on the first try, we try again, since the
+    // time will be different.  Since we use nanoseconds, it's pretty
+    // unlikely that we'll loop even once, unless the system clock has a
+    // poor granularity or multi-socket systems have clocks slightly out
+    // of sync.
+    while (true) {
+      Key keyToInsert = new Key(buffer.capacity(), System.nanoTime());
+      if (!buffersTree.containsKey(keyToInsert)) {
+        buffersTree.put(keyToInsert, new WeakReference<>(buffer));
+        return;
+      }
+    }

Review Comment:
   Sorry didn't get this comment. which buffer is not found?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-issues-help@hadoop.apache.org


[GitHub] [hadoop] hadoop-yetus commented on pull request #4263: HADOOP-18105 Implement buffer pooling with weak references

Posted by GitBox <gi...@apache.org>.
hadoop-yetus commented on PR #4263:
URL: https://github.com/apache/hadoop/pull/4263#issuecomment-1118088293

   :broken_heart: **-1 overall**
   
   
   
   
   
   
   | Vote | Subsystem | Runtime |  Logfile | Comment |
   |:----:|----------:|--------:|:--------:|:-------:|
   | +0 :ok: |  reexec  |   1m  1s |  |  Docker mode activated.  |
   |||| _ Prechecks _ |
   | +1 :green_heart: |  dupname  |   0m  0s |  |  No case conflicting files found.  |
   | +0 :ok: |  codespell  |   0m  0s |  |  codespell was not available.  |
   | +1 :green_heart: |  @author  |   0m  0s |  |  The patch does not contain any @author tags.  |
   | +1 :green_heart: |  test4tests  |   0m  0s |  |  The patch appears to include 2 new or modified test files.  |
   |||| _ feature-vectored-io Compile Tests _ |
   | +1 :green_heart: |  mvninstall  |  42m  8s |  |  feature-vectored-io passed  |
   | +1 :green_heart: |  compile  |  25m  2s |  |  feature-vectored-io passed with JDK Private Build-11.0.15+10-Ubuntu-0ubuntu0.20.04.1  |
   | +1 :green_heart: |  compile  |  21m 35s |  |  feature-vectored-io passed with JDK Private Build-1.8.0_312-8u312-b07-0ubuntu1~20.04-b07  |
   | +1 :green_heart: |  checkstyle  |   1m 32s |  |  feature-vectored-io passed  |
   | +1 :green_heart: |  mvnsite  |   1m 59s |  |  feature-vectored-io passed  |
   | -1 :x: |  javadoc  |   1m 37s | [/branch-javadoc-hadoop-common-project_hadoop-common-jdkPrivateBuild-11.0.15+10-Ubuntu-0ubuntu0.20.04.1.txt](https://ci-hadoop.apache.org/job/hadoop-multibranch/job/PR-4263/1/artifact/out/branch-javadoc-hadoop-common-project_hadoop-common-jdkPrivateBuild-11.0.15+10-Ubuntu-0ubuntu0.20.04.1.txt) |  hadoop-common in feature-vectored-io failed with JDK Private Build-11.0.15+10-Ubuntu-0ubuntu0.20.04.1.  |
   | +1 :green_heart: |  javadoc  |   2m  4s |  |  feature-vectored-io passed with JDK Private Build-1.8.0_312-8u312-b07-0ubuntu1~20.04-b07  |
   | +1 :green_heart: |  spotbugs  |   3m  2s |  |  feature-vectored-io passed  |
   | +1 :green_heart: |  shadedclient  |  26m  5s |  |  branch has no errors when building and testing our client artifacts.  |
   |||| _ Patch Compile Tests _ |
   | +1 :green_heart: |  mvninstall  |   1m  6s |  |  the patch passed  |
   | +1 :green_heart: |  compile  |  24m 13s |  |  the patch passed with JDK Private Build-11.0.15+10-Ubuntu-0ubuntu0.20.04.1  |
   | +1 :green_heart: |  javac  |  24m 13s |  |  the patch passed  |
   | +1 :green_heart: |  compile  |  21m 39s |  |  the patch passed with JDK Private Build-1.8.0_312-8u312-b07-0ubuntu1~20.04-b07  |
   | +1 :green_heart: |  javac  |  21m 39s |  |  the patch passed  |
   | -1 :x: |  blanks  |   0m  0s | [/blanks-eol.txt](https://ci-hadoop.apache.org/job/hadoop-multibranch/job/PR-4263/1/artifact/out/blanks-eol.txt) |  The patch has 1 line(s) that end in blanks. Use git apply --whitespace=fix <<patch_file>>. Refer https://git-scm.com/docs/git-apply  |
   | -0 :warning: |  checkstyle  |   1m 25s | [/results-checkstyle-hadoop-common-project_hadoop-common.txt](https://ci-hadoop.apache.org/job/hadoop-multibranch/job/PR-4263/1/artifact/out/results-checkstyle-hadoop-common-project_hadoop-common.txt) |  hadoop-common-project/hadoop-common: The patch generated 1 new + 1 unchanged - 0 fixed = 2 total (was 1)  |
   | +1 :green_heart: |  mvnsite  |   1m 57s |  |  the patch passed  |
   | -1 :x: |  javadoc  |   1m 27s | [/patch-javadoc-hadoop-common-project_hadoop-common-jdkPrivateBuild-11.0.15+10-Ubuntu-0ubuntu0.20.04.1.txt](https://ci-hadoop.apache.org/job/hadoop-multibranch/job/PR-4263/1/artifact/out/patch-javadoc-hadoop-common-project_hadoop-common-jdkPrivateBuild-11.0.15+10-Ubuntu-0ubuntu0.20.04.1.txt) |  hadoop-common in the patch failed with JDK Private Build-11.0.15+10-Ubuntu-0ubuntu0.20.04.1.  |
   | +1 :green_heart: |  javadoc  |   2m  6s |  |  the patch passed with JDK Private Build-1.8.0_312-8u312-b07-0ubuntu1~20.04-b07  |
   | +1 :green_heart: |  spotbugs  |   3m  1s |  |  the patch passed  |
   | +1 :green_heart: |  shadedclient  |  26m 26s |  |  patch has no errors when building and testing our client artifacts.  |
   |||| _ Other Tests _ |
   | +1 :green_heart: |  unit  |  18m 11s |  |  hadoop-common in the patch passed.  |
   | +1 :green_heart: |  asflicense  |   1m 18s |  |  The patch does not generate ASF License warnings.  |
   |  |   | 229m 15s |  |  |
   
   
   | Subsystem | Report/Notes |
   |----------:|:-------------|
   | Docker | ClientAPI=1.41 ServerAPI=1.41 base: https://ci-hadoop.apache.org/job/hadoop-multibranch/job/PR-4263/1/artifact/out/Dockerfile |
   | GITHUB PR | https://github.com/apache/hadoop/pull/4263 |
   | Optional Tests | dupname asflicense compile javac javadoc mvninstall mvnsite unit shadedclient spotbugs checkstyle codespell |
   | uname | Linux 32b0a4c29b2d 4.15.0-175-generic #184-Ubuntu SMP Thu Mar 24 17:48:36 UTC 2022 x86_64 x86_64 x86_64 GNU/Linux |
   | Build tool | maven |
   | Personality | dev-support/bin/hadoop.sh |
   | git revision | feature-vectored-io / 1c2a013c3bcdb96f3501ab6a408ece30347f9875 |
   | Default Java | Private Build-1.8.0_312-8u312-b07-0ubuntu1~20.04-b07 |
   | Multi-JDK versions | /usr/lib/jvm/java-11-openjdk-amd64:Private Build-11.0.15+10-Ubuntu-0ubuntu0.20.04.1 /usr/lib/jvm/java-8-openjdk-amd64:Private Build-1.8.0_312-8u312-b07-0ubuntu1~20.04-b07 |
   |  Test Results | https://ci-hadoop.apache.org/job/hadoop-multibranch/job/PR-4263/1/testReport/ |
   | Max. process+thread count | 2723 (vs. ulimit of 5500) |
   | modules | C: hadoop-common-project/hadoop-common U: hadoop-common-project/hadoop-common |
   | Console output | https://ci-hadoop.apache.org/job/hadoop-multibranch/job/PR-4263/1/console |
   | versions | git=2.25.1 maven=3.6.3 spotbugs=4.2.2 |
   | Powered by | Apache Yetus 0.14.0-SNAPSHOT https://yetus.apache.org |
   
   
   This message was automatically generated.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-issues-help@hadoop.apache.org


[GitHub] [hadoop] steveloughran commented on a diff in pull request #4263: HADOOP-18105 Implement buffer pooling with weak references

Posted by GitBox <gi...@apache.org>.
steveloughran commented on code in PR #4263:
URL: https://github.com/apache/hadoop/pull/4263#discussion_r875761159


##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestWeakReferencedElasticByteBufferPool.java:
##########
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/**
+ * Unit tests for {@code WeakReferencedElasticByteBufferPool}.
+ */
+@RunWith(Parameterized.class)
+public class TestWeakReferencedElasticByteBufferPool {
+
+  private final boolean isDirect;
+
+  private final String type;
+
+  @Parameterized.Parameters(name = "Buffer type : {0}")
+  public static List<String> params() {
+    return Arrays.asList("direct", "array");
+  }
+
+  public TestWeakReferencedElasticByteBufferPool(String type) {
+    this.type = type;
+    this.isDirect = !"array".equals(type);
+  }
+
+  // Add more tests for different time and same size buffers in the pool. 
+  @Test
+  public void testGetAndPutBasic() {
+    WeakReferencedElasticByteBufferPool pool = new WeakReferencedElasticByteBufferPool();
+    int bufferSize = 5;
+    ByteBuffer buffer = pool.getBuffer(isDirect, bufferSize);
+    Assertions.assertThat(buffer.isDirect())
+            .describedAs("Buffered returned should be of correct type {}", type)
+            .isEqualTo(isDirect);
+    Assertions.assertThat(buffer.capacity())
+            .describedAs("Initial capacity of returned buffer from pool")
+            .isEqualTo(bufferSize);
+    Assertions.assertThat(buffer.position())
+            .describedAs("Initial position of returned buffer from pool")
+            .isEqualTo(0);
+
+    byte[] arr = createByteArray(bufferSize);
+    buffer.put(arr, 0, arr.length);
+    buffer.flip();
+    validateBufferContent(buffer, arr);
+    Assertions.assertThat(buffer.position())
+            .describedAs("Buffer's position after filling bytes in it")
+            .isEqualTo(bufferSize);
+    // releasing buffer to the pool.
+    pool.putBuffer(buffer);
+    Assertions.assertThat(buffer.position())
+            .describedAs("Position should be reset to 0 after returning buffer to the pool")
+            .isEqualTo(0);
+
+  }
+
+  @Test
+  public void testPoolingWithDifferentSizes() {
+    WeakReferencedElasticByteBufferPool pool = new WeakReferencedElasticByteBufferPool();
+    ByteBuffer buffer = pool.getBuffer(isDirect, 5);
+    ByteBuffer buffer1 = pool.getBuffer(isDirect, 10);
+    ByteBuffer buffer2 = pool.getBuffer(isDirect, 15);
+
+    Assertions.assertThat(pool.getCurrentBuffersCount(isDirect))
+            .describedAs("Number of buffers in the pool")
+            .isEqualTo(0);
+
+    pool.putBuffer(buffer1);
+    pool.putBuffer(buffer2);
+    Assertions.assertThat(pool.getCurrentBuffersCount(isDirect))
+            .describedAs("Number of buffers in the pool")
+            .isEqualTo(2);
+    ByteBuffer buffer3 = pool.getBuffer(isDirect, 12);
+    Assertions.assertThat(buffer3.capacity())
+            .describedAs("Pooled buffer should have older capacity")
+            .isEqualTo(15);
+    Assertions.assertThat(pool.getCurrentBuffersCount(isDirect))
+            .describedAs("Number of buffers in the pool")
+            .isEqualTo(1);
+    pool.putBuffer(buffer);
+    ByteBuffer buffer4 = pool.getBuffer(isDirect, 6);
+    Assertions.assertThat(buffer4.capacity())
+            .describedAs("Pooled buffer should have older capacity")
+            .isEqualTo(10);
+    Assertions.assertThat(pool.getCurrentBuffersCount(isDirect))
+            .describedAs("Number of buffers in the pool")
+            .isEqualTo(1);
+
+    pool.release();
+    Assertions.assertThat(pool.getCurrentBuffersCount(isDirect))
+            .describedAs("Number of buffers in the pool post release")
+            .isEqualTo(0);
+  }
+
+  @Test
+  public void testPoolingWithDifferentInsertionTime() {
+    WeakReferencedElasticByteBufferPool pool = new WeakReferencedElasticByteBufferPool();
+    ByteBuffer buffer = pool.getBuffer(isDirect, 10);
+    ByteBuffer buffer1 = pool.getBuffer(isDirect, 10);
+    ByteBuffer buffer2 = pool.getBuffer(isDirect, 10);
+
+    Assertions.assertThat(pool.getCurrentBuffersCount(isDirect))
+            .describedAs("Number of buffers in the pool")
+            .isEqualTo(0);
+
+    pool.putBuffer(buffer1);
+    pool.putBuffer(buffer2);
+    Assertions.assertThat(pool.getCurrentBuffersCount(isDirect))
+            .describedAs("Number of buffers in the pool")
+            .isEqualTo(2);
+    ByteBuffer buffer3 = pool.getBuffer(isDirect, 10);
+    // As buffer1 is returned to the pool before buffer2, it should
+    // be returned when buffer of same size is asked again from
+    // the pool.
+    Assertions.assertThat(buffer3 == buffer1)
+            .describedAs("Buffers should be returned in order of their " +
+                    "insertion time")
+            .isTrue();
+    pool.putBuffer(buffer);
+    ByteBuffer buffer4 = pool.getBuffer(isDirect, 10);
+    Assertions.assertThat(buffer4 == buffer2)
+            .describedAs("Buffers should be returned in order of their " +
+                    "insertion time")
+            .isTrue();
+  }
+
+  @Test
+  public void testGarbageCollection() {
+    WeakReferencedElasticByteBufferPool pool = new WeakReferencedElasticByteBufferPool();
+    ByteBuffer buffer = pool.getBuffer(isDirect, 5);
+    ByteBuffer buffer1 = pool.getBuffer(isDirect, 10);
+    ByteBuffer buffer2 = pool.getBuffer(isDirect, 15);
+    Assertions.assertThat(pool.getCurrentBuffersCount(isDirect))
+            .describedAs("Number of buffers in the pool")
+            .isEqualTo(0);
+    pool.putBuffer(buffer1);
+    pool.putBuffer(buffer2);
+    Assertions.assertThat(pool.getCurrentBuffersCount(isDirect))
+            .describedAs("Number of buffers in the pool")
+            .isEqualTo(2);
+    // Before GC.
+    ByteBuffer buffer4 = pool.getBuffer(isDirect, 12);
+    Assertions.assertThat(buffer4.capacity())
+            .describedAs("Pooled buffer should have older capacity")
+            .isEqualTo(15);
+
+    // Removing the references
+    buffer1 = null;
+    buffer2 = null;
+    System.gc();

Review Comment:
   ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-issues-help@hadoop.apache.org


[GitHub] [hadoop] mukund-thakur commented on pull request #4263: HADOOP-18105 Implement buffer pooling with weak references

Posted by GitBox <gi...@apache.org>.
mukund-thakur commented on PR #4263:
URL: https://github.com/apache/hadoop/pull/4263#issuecomment-1121476407

   CC @mehakmeet  @steveloughran 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-issues-help@hadoop.apache.org


[GitHub] [hadoop] hadoop-yetus commented on pull request #4263: HADOOP-18105 Implement buffer pooling with weak references

Posted by GitBox <gi...@apache.org>.
hadoop-yetus commented on PR #4263:
URL: https://github.com/apache/hadoop/pull/4263#issuecomment-1118088382

   :broken_heart: **-1 overall**
   
   
   
   
   
   
   | Vote | Subsystem | Runtime |  Logfile | Comment |
   |:----:|----------:|--------:|:--------:|:-------:|
   | +0 :ok: |  reexec  |   0m 53s |  |  Docker mode activated.  |
   |||| _ Prechecks _ |
   | +1 :green_heart: |  dupname  |   0m  0s |  |  No case conflicting files found.  |
   | +0 :ok: |  codespell  |   0m  1s |  |  codespell was not available.  |
   | +1 :green_heart: |  @author  |   0m  0s |  |  The patch does not contain any @author tags.  |
   | +1 :green_heart: |  test4tests  |   0m  0s |  |  The patch appears to include 2 new or modified test files.  |
   |||| _ feature-vectored-io Compile Tests _ |
   | +1 :green_heart: |  mvninstall  |  41m 43s |  |  feature-vectored-io passed  |
   | +1 :green_heart: |  compile  |  25m 11s |  |  feature-vectored-io passed with JDK Private Build-11.0.15+10-Ubuntu-0ubuntu0.20.04.1  |
   | +1 :green_heart: |  compile  |  21m 43s |  |  feature-vectored-io passed with JDK Private Build-1.8.0_312-8u312-b07-0ubuntu1~20.04-b07  |
   | +1 :green_heart: |  checkstyle  |   1m 30s |  |  feature-vectored-io passed  |
   | +1 :green_heart: |  mvnsite  |   1m 58s |  |  feature-vectored-io passed  |
   | -1 :x: |  javadoc  |   1m 37s | [/branch-javadoc-hadoop-common-project_hadoop-common-jdkPrivateBuild-11.0.15+10-Ubuntu-0ubuntu0.20.04.1.txt](https://ci-hadoop.apache.org/job/hadoop-multibranch/job/PR-4263/2/artifact/out/branch-javadoc-hadoop-common-project_hadoop-common-jdkPrivateBuild-11.0.15+10-Ubuntu-0ubuntu0.20.04.1.txt) |  hadoop-common in feature-vectored-io failed with JDK Private Build-11.0.15+10-Ubuntu-0ubuntu0.20.04.1.  |
   | +1 :green_heart: |  javadoc  |   1m 58s |  |  feature-vectored-io passed with JDK Private Build-1.8.0_312-8u312-b07-0ubuntu1~20.04-b07  |
   | +1 :green_heart: |  spotbugs  |   3m  5s |  |  feature-vectored-io passed  |
   | +1 :green_heart: |  shadedclient  |  25m 59s |  |  branch has no errors when building and testing our client artifacts.  |
   |||| _ Patch Compile Tests _ |
   | +1 :green_heart: |  mvninstall  |   1m  8s |  |  the patch passed  |
   | +1 :green_heart: |  compile  |  24m 21s |  |  the patch passed with JDK Private Build-11.0.15+10-Ubuntu-0ubuntu0.20.04.1  |
   | +1 :green_heart: |  javac  |  24m 21s |  |  the patch passed  |
   | +1 :green_heart: |  compile  |  21m 41s |  |  the patch passed with JDK Private Build-1.8.0_312-8u312-b07-0ubuntu1~20.04-b07  |
   | +1 :green_heart: |  javac  |  21m 41s |  |  the patch passed  |
   | -1 :x: |  blanks  |   0m  0s | [/blanks-eol.txt](https://ci-hadoop.apache.org/job/hadoop-multibranch/job/PR-4263/2/artifact/out/blanks-eol.txt) |  The patch has 1 line(s) that end in blanks. Use git apply --whitespace=fix <<patch_file>>. Refer https://git-scm.com/docs/git-apply  |
   | +1 :green_heart: |  checkstyle  |   1m 24s |  |  the patch passed  |
   | +1 :green_heart: |  mvnsite  |   1m 56s |  |  the patch passed  |
   | -1 :x: |  javadoc  |   1m 28s | [/patch-javadoc-hadoop-common-project_hadoop-common-jdkPrivateBuild-11.0.15+10-Ubuntu-0ubuntu0.20.04.1.txt](https://ci-hadoop.apache.org/job/hadoop-multibranch/job/PR-4263/2/artifact/out/patch-javadoc-hadoop-common-project_hadoop-common-jdkPrivateBuild-11.0.15+10-Ubuntu-0ubuntu0.20.04.1.txt) |  hadoop-common in the patch failed with JDK Private Build-11.0.15+10-Ubuntu-0ubuntu0.20.04.1.  |
   | +1 :green_heart: |  javadoc  |   2m  0s |  |  the patch passed with JDK Private Build-1.8.0_312-8u312-b07-0ubuntu1~20.04-b07  |
   | +1 :green_heart: |  spotbugs  |   3m  5s |  |  the patch passed  |
   | +1 :green_heart: |  shadedclient  |  26m  9s |  |  patch has no errors when building and testing our client artifacts.  |
   |||| _ Other Tests _ |
   | +1 :green_heart: |  unit  |  18m 15s |  |  hadoop-common in the patch passed.  |
   | +1 :green_heart: |  asflicense  |   1m 16s |  |  The patch does not generate ASF License warnings.  |
   |  |   | 228m 49s |  |  |
   
   
   | Subsystem | Report/Notes |
   |----------:|:-------------|
   | Docker | ClientAPI=1.41 ServerAPI=1.41 base: https://ci-hadoop.apache.org/job/hadoop-multibranch/job/PR-4263/2/artifact/out/Dockerfile |
   | GITHUB PR | https://github.com/apache/hadoop/pull/4263 |
   | Optional Tests | dupname asflicense compile javac javadoc mvninstall mvnsite unit shadedclient spotbugs checkstyle codespell |
   | uname | Linux f203364fe5e9 4.15.0-175-generic #184-Ubuntu SMP Thu Mar 24 17:48:36 UTC 2022 x86_64 x86_64 x86_64 GNU/Linux |
   | Build tool | maven |
   | Personality | dev-support/bin/hadoop.sh |
   | git revision | feature-vectored-io / 1c2a013c3bcdb96f3501ab6a408ece30347f9875 |
   | Default Java | Private Build-1.8.0_312-8u312-b07-0ubuntu1~20.04-b07 |
   | Multi-JDK versions | /usr/lib/jvm/java-11-openjdk-amd64:Private Build-11.0.15+10-Ubuntu-0ubuntu0.20.04.1 /usr/lib/jvm/java-8-openjdk-amd64:Private Build-1.8.0_312-8u312-b07-0ubuntu1~20.04-b07 |
   |  Test Results | https://ci-hadoop.apache.org/job/hadoop-multibranch/job/PR-4263/2/testReport/ |
   | Max. process+thread count | 1243 (vs. ulimit of 5500) |
   | modules | C: hadoop-common-project/hadoop-common U: hadoop-common-project/hadoop-common |
   | Console output | https://ci-hadoop.apache.org/job/hadoop-multibranch/job/PR-4263/2/console |
   | versions | git=2.25.1 maven=3.6.3 spotbugs=4.2.2 |
   | Powered by | Apache Yetus 0.14.0-SNAPSHOT https://yetus.apache.org |
   
   
   This message was automatically generated.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-issues-help@hadoop.apache.org


[GitHub] [hadoop] mukund-thakur merged pull request #4263: HADOOP-18105 Implement buffer pooling with weak references

Posted by GitBox <gi...@apache.org>.
mukund-thakur merged PR #4263:
URL: https://github.com/apache/hadoop/pull/4263


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-issues-help@hadoop.apache.org


[GitHub] [hadoop] mukund-thakur commented on a diff in pull request #4263: HADOOP-18105 Implement buffer pooling with weak references

Posted by GitBox <gi...@apache.org>.
mukund-thakur commented on code in PR #4263:
URL: https://github.com/apache/hadoop/pull/4263#discussion_r875361776


##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestWeakReferencedElasticByteBufferPool.java:
##########
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/**
+ * Unit tests for {@code WeakReferencedElasticByteBufferPool}.
+ */
+@RunWith(Parameterized.class)
+public class TestWeakReferencedElasticByteBufferPool {
+
+  private final boolean isDirect;
+
+  private final String type;
+
+  @Parameterized.Parameters(name = "Buffer type : {0}")
+  public static List<String> params() {
+    return Arrays.asList("direct", "array");
+  }
+
+  public TestWeakReferencedElasticByteBufferPool(String type) {
+    this.type = type;
+    this.isDirect = !"array".equals(type);
+  }
+
+  // Add more tests for different time and same size buffers in the pool. 
+  @Test
+  public void testGetAndPutBasic() {
+    WeakReferencedElasticByteBufferPool pool = new WeakReferencedElasticByteBufferPool();
+    int bufferSize = 5;
+    ByteBuffer buffer = pool.getBuffer(isDirect, bufferSize);
+    Assertions.assertThat(buffer.isDirect())
+            .describedAs("Buffered returned should be of correct type {}", type)
+            .isEqualTo(isDirect);
+    Assertions.assertThat(buffer.capacity())
+            .describedAs("Initial capacity of returned buffer from pool")
+            .isEqualTo(bufferSize);
+    Assertions.assertThat(buffer.position())
+            .describedAs("Initial position of returned buffer from pool")
+            .isEqualTo(0);
+
+    byte[] arr = createByteArray(bufferSize);
+    buffer.put(arr, 0, arr.length);
+    buffer.flip();
+    validateBufferContent(buffer, arr);
+    Assertions.assertThat(buffer.position())
+            .describedAs("Buffer's position after filling bytes in it")
+            .isEqualTo(bufferSize);
+    // releasing buffer to the pool.
+    pool.putBuffer(buffer);
+    Assertions.assertThat(buffer.position())
+            .describedAs("Position should be reset to 0 after returning buffer to the pool")
+            .isEqualTo(0);
+
+  }
+
+  @Test
+  public void testPoolingWithDifferentSizes() {
+    WeakReferencedElasticByteBufferPool pool = new WeakReferencedElasticByteBufferPool();
+    ByteBuffer buffer = pool.getBuffer(isDirect, 5);
+    ByteBuffer buffer1 = pool.getBuffer(isDirect, 10);
+    ByteBuffer buffer2 = pool.getBuffer(isDirect, 15);
+
+    Assertions.assertThat(pool.getCurrentBuffersCount(isDirect))
+            .describedAs("Number of buffers in the pool")
+            .isEqualTo(0);
+
+    pool.putBuffer(buffer1);
+    pool.putBuffer(buffer2);
+    Assertions.assertThat(pool.getCurrentBuffersCount(isDirect))
+            .describedAs("Number of buffers in the pool")
+            .isEqualTo(2);
+    ByteBuffer buffer3 = pool.getBuffer(isDirect, 12);
+    Assertions.assertThat(buffer3.capacity())
+            .describedAs("Pooled buffer should have older capacity")
+            .isEqualTo(15);
+    Assertions.assertThat(pool.getCurrentBuffersCount(isDirect))
+            .describedAs("Number of buffers in the pool")
+            .isEqualTo(1);
+    pool.putBuffer(buffer);
+    ByteBuffer buffer4 = pool.getBuffer(isDirect, 6);
+    Assertions.assertThat(buffer4.capacity())
+            .describedAs("Pooled buffer should have older capacity")
+            .isEqualTo(10);
+    Assertions.assertThat(pool.getCurrentBuffersCount(isDirect))
+            .describedAs("Number of buffers in the pool")
+            .isEqualTo(1);
+
+    pool.release();
+    Assertions.assertThat(pool.getCurrentBuffersCount(isDirect))
+            .describedAs("Number of buffers in the pool post release")
+            .isEqualTo(0);
+  }
+
+  @Test
+  public void testPoolingWithDifferentInsertionTime() {
+    WeakReferencedElasticByteBufferPool pool = new WeakReferencedElasticByteBufferPool();
+    ByteBuffer buffer = pool.getBuffer(isDirect, 10);
+    ByteBuffer buffer1 = pool.getBuffer(isDirect, 10);
+    ByteBuffer buffer2 = pool.getBuffer(isDirect, 10);
+
+    Assertions.assertThat(pool.getCurrentBuffersCount(isDirect))
+            .describedAs("Number of buffers in the pool")
+            .isEqualTo(0);
+
+    pool.putBuffer(buffer1);
+    pool.putBuffer(buffer2);
+    Assertions.assertThat(pool.getCurrentBuffersCount(isDirect))
+            .describedAs("Number of buffers in the pool")
+            .isEqualTo(2);
+    ByteBuffer buffer3 = pool.getBuffer(isDirect, 10);
+    // As buffer1 is returned to the pool before buffer2, it should
+    // be returned when buffer of same size is asked again from
+    // the pool.
+    Assertions.assertThat(buffer3 == buffer1)
+            .describedAs("Buffers should be returned in order of their " +
+                    "insertion time")
+            .isTrue();
+    pool.putBuffer(buffer);
+    ByteBuffer buffer4 = pool.getBuffer(isDirect, 10);
+    Assertions.assertThat(buffer4 == buffer2)
+            .describedAs("Buffers should be returned in order of their " +
+                    "insertion time")
+            .isTrue();
+  }
+
+  @Test
+  public void testGarbageCollection() {
+    WeakReferencedElasticByteBufferPool pool = new WeakReferencedElasticByteBufferPool();
+    ByteBuffer buffer = pool.getBuffer(isDirect, 5);
+    ByteBuffer buffer1 = pool.getBuffer(isDirect, 10);
+    ByteBuffer buffer2 = pool.getBuffer(isDirect, 15);
+    Assertions.assertThat(pool.getCurrentBuffersCount(isDirect))
+            .describedAs("Number of buffers in the pool")
+            .isEqualTo(0);
+    pool.putBuffer(buffer1);
+    pool.putBuffer(buffer2);
+    Assertions.assertThat(pool.getCurrentBuffersCount(isDirect))
+            .describedAs("Number of buffers in the pool")
+            .isEqualTo(2);
+    // Before GC.
+    ByteBuffer buffer4 = pool.getBuffer(isDirect, 12);
+    Assertions.assertThat(buffer4.capacity())
+            .describedAs("Pooled buffer should have older capacity")
+            .isEqualTo(15);
+
+    // Removing the references
+    buffer1 = null;
+    buffer2 = null;
+    System.gc();

Review Comment:
   Oh I see. and just revisited your code and see many managers being created in the test. 
   For me it is working fine though. Every-time giving consistent results. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-issues-help@hadoop.apache.org


[GitHub] [hadoop] hadoop-yetus commented on pull request #4263: HADOOP-18105 Implement buffer pooling with weak references

Posted by GitBox <gi...@apache.org>.
hadoop-yetus commented on PR #4263:
URL: https://github.com/apache/hadoop/pull/4263#issuecomment-1129536677

   :broken_heart: **-1 overall**
   
   
   
   
   
   
   | Vote | Subsystem | Runtime |  Logfile | Comment |
   |:----:|----------:|--------:|:--------:|:-------:|
   | +0 :ok: |  reexec  |   0m 55s |  |  Docker mode activated.  |
   |||| _ Prechecks _ |
   | +1 :green_heart: |  dupname  |   0m  0s |  |  No case conflicting files found.  |
   | +0 :ok: |  codespell  |   0m  1s |  |  codespell was not available.  |
   | +1 :green_heart: |  @author  |   0m  0s |  |  The patch does not contain any @author tags.  |
   | +1 :green_heart: |  test4tests  |   0m  0s |  |  The patch appears to include 2 new or modified test files.  |
   |||| _ feature-vectored-io Compile Tests _ |
   | +1 :green_heart: |  mvninstall  |  40m 27s |  |  feature-vectored-io passed  |
   | +1 :green_heart: |  compile  |  24m 59s |  |  feature-vectored-io passed with JDK Private Build-11.0.15+10-Ubuntu-0ubuntu0.20.04.1  |
   | +1 :green_heart: |  compile  |  21m 42s |  |  feature-vectored-io passed with JDK Private Build-1.8.0_312-8u312-b07-0ubuntu1~20.04-b07  |
   | +1 :green_heart: |  checkstyle  |   1m 31s |  |  feature-vectored-io passed  |
   | +1 :green_heart: |  mvnsite  |   1m 59s |  |  feature-vectored-io passed  |
   | -1 :x: |  javadoc  |   1m 37s | [/branch-javadoc-hadoop-common-project_hadoop-common-jdkPrivateBuild-11.0.15+10-Ubuntu-0ubuntu0.20.04.1.txt](https://ci-hadoop.apache.org/job/hadoop-multibranch/job/PR-4263/3/artifact/out/branch-javadoc-hadoop-common-project_hadoop-common-jdkPrivateBuild-11.0.15+10-Ubuntu-0ubuntu0.20.04.1.txt) |  hadoop-common in feature-vectored-io failed with JDK Private Build-11.0.15+10-Ubuntu-0ubuntu0.20.04.1.  |
   | +1 :green_heart: |  javadoc  |   2m  2s |  |  feature-vectored-io passed with JDK Private Build-1.8.0_312-8u312-b07-0ubuntu1~20.04-b07  |
   | +1 :green_heart: |  spotbugs  |   3m  6s |  |  feature-vectored-io passed  |
   | +1 :green_heart: |  shadedclient  |  26m  5s |  |  branch has no errors when building and testing our client artifacts.  |
   |||| _ Patch Compile Tests _ |
   | +1 :green_heart: |  mvninstall  |   1m  4s |  |  the patch passed  |
   | +1 :green_heart: |  compile  |  24m 16s |  |  the patch passed with JDK Private Build-11.0.15+10-Ubuntu-0ubuntu0.20.04.1  |
   | +1 :green_heart: |  javac  |  24m 16s |  |  the patch passed  |
   | +1 :green_heart: |  compile  |  21m 39s |  |  the patch passed with JDK Private Build-1.8.0_312-8u312-b07-0ubuntu1~20.04-b07  |
   | +1 :green_heart: |  javac  |  21m 39s |  |  the patch passed  |
   | +1 :green_heart: |  blanks  |   0m  0s |  |  The patch has no blanks issues.  |
   | +1 :green_heart: |  checkstyle  |   1m 24s |  |  the patch passed  |
   | +1 :green_heart: |  mvnsite  |   1m 57s |  |  the patch passed  |
   | -1 :x: |  javadoc  |   1m 25s | [/patch-javadoc-hadoop-common-project_hadoop-common-jdkPrivateBuild-11.0.15+10-Ubuntu-0ubuntu0.20.04.1.txt](https://ci-hadoop.apache.org/job/hadoop-multibranch/job/PR-4263/3/artifact/out/patch-javadoc-hadoop-common-project_hadoop-common-jdkPrivateBuild-11.0.15+10-Ubuntu-0ubuntu0.20.04.1.txt) |  hadoop-common in the patch failed with JDK Private Build-11.0.15+10-Ubuntu-0ubuntu0.20.04.1.  |
   | +1 :green_heart: |  javadoc  |   2m  0s |  |  the patch passed with JDK Private Build-1.8.0_312-8u312-b07-0ubuntu1~20.04-b07  |
   | +1 :green_heart: |  spotbugs  |   3m  3s |  |  the patch passed  |
   | +1 :green_heart: |  shadedclient  |  25m 55s |  |  patch has no errors when building and testing our client artifacts.  |
   |||| _ Other Tests _ |
   | +1 :green_heart: |  unit  |  18m 15s |  |  hadoop-common in the patch passed.  |
   | +1 :green_heart: |  asflicense  |   1m 16s |  |  The patch does not generate ASF License warnings.  |
   |  |   | 226m 54s |  |  |
   
   
   | Subsystem | Report/Notes |
   |----------:|:-------------|
   | Docker | ClientAPI=1.41 ServerAPI=1.41 base: https://ci-hadoop.apache.org/job/hadoop-multibranch/job/PR-4263/3/artifact/out/Dockerfile |
   | GITHUB PR | https://github.com/apache/hadoop/pull/4263 |
   | Optional Tests | dupname asflicense compile javac javadoc mvninstall mvnsite unit shadedclient spotbugs checkstyle codespell |
   | uname | Linux 19fc3f0597f5 4.15.0-175-generic #184-Ubuntu SMP Thu Mar 24 17:48:36 UTC 2022 x86_64 x86_64 x86_64 GNU/Linux |
   | Build tool | maven |
   | Personality | dev-support/bin/hadoop.sh |
   | git revision | feature-vectored-io / 12a1925318877133859b7b8ea08754fa9a324bfd |
   | Default Java | Private Build-1.8.0_312-8u312-b07-0ubuntu1~20.04-b07 |
   | Multi-JDK versions | /usr/lib/jvm/java-11-openjdk-amd64:Private Build-11.0.15+10-Ubuntu-0ubuntu0.20.04.1 /usr/lib/jvm/java-8-openjdk-amd64:Private Build-1.8.0_312-8u312-b07-0ubuntu1~20.04-b07 |
   |  Test Results | https://ci-hadoop.apache.org/job/hadoop-multibranch/job/PR-4263/3/testReport/ |
   | Max. process+thread count | 2802 (vs. ulimit of 5500) |
   | modules | C: hadoop-common-project/hadoop-common U: hadoop-common-project/hadoop-common |
   | Console output | https://ci-hadoop.apache.org/job/hadoop-multibranch/job/PR-4263/3/console |
   | versions | git=2.25.1 maven=3.6.3 spotbugs=4.2.2 |
   | Powered by | Apache Yetus 0.14.0-SNAPSHOT https://yetus.apache.org |
   
   
   This message was automatically generated.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-issues-help@hadoop.apache.org


[GitHub] [hadoop] hadoop-yetus commented on pull request #4263: HADOOP-18105 Implement buffer pooling with weak references

Posted by GitBox <gi...@apache.org>.
hadoop-yetus commented on PR #4263:
URL: https://github.com/apache/hadoop/pull/4263#issuecomment-1130652549

   :broken_heart: **-1 overall**
   
   
   
   
   
   
   | Vote | Subsystem | Runtime |  Logfile | Comment |
   |:----:|----------:|--------:|:--------:|:-------:|
   | +0 :ok: |  reexec  |   0m 58s |  |  Docker mode activated.  |
   |||| _ Prechecks _ |
   | +1 :green_heart: |  dupname  |   0m  0s |  |  No case conflicting files found.  |
   | +0 :ok: |  codespell  |   0m  0s |  |  codespell was not available.  |
   | +1 :green_heart: |  @author  |   0m  0s |  |  The patch does not contain any @author tags.  |
   | +1 :green_heart: |  test4tests  |   0m  0s |  |  The patch appears to include 2 new or modified test files.  |
   |||| _ feature-vectored-io Compile Tests _ |
   | +1 :green_heart: |  mvninstall  |  42m  5s |  |  feature-vectored-io passed  |
   | +1 :green_heart: |  compile  |  25m  8s |  |  feature-vectored-io passed with JDK Private Build-11.0.15+10-Ubuntu-0ubuntu0.20.04.1  |
   | +1 :green_heart: |  compile  |  21m 34s |  |  feature-vectored-io passed with JDK Private Build-1.8.0_312-8u312-b07-0ubuntu1~20.04-b07  |
   | +1 :green_heart: |  checkstyle  |   1m 31s |  |  feature-vectored-io passed  |
   | +1 :green_heart: |  mvnsite  |   2m  0s |  |  feature-vectored-io passed  |
   | -1 :x: |  javadoc  |   1m 37s | [/branch-javadoc-hadoop-common-project_hadoop-common-jdkPrivateBuild-11.0.15+10-Ubuntu-0ubuntu0.20.04.1.txt](https://ci-hadoop.apache.org/job/hadoop-multibranch/job/PR-4263/4/artifact/out/branch-javadoc-hadoop-common-project_hadoop-common-jdkPrivateBuild-11.0.15+10-Ubuntu-0ubuntu0.20.04.1.txt) |  hadoop-common in feature-vectored-io failed with JDK Private Build-11.0.15+10-Ubuntu-0ubuntu0.20.04.1.  |
   | +1 :green_heart: |  javadoc  |   2m  3s |  |  feature-vectored-io passed with JDK Private Build-1.8.0_312-8u312-b07-0ubuntu1~20.04-b07  |
   | +1 :green_heart: |  spotbugs  |   3m  3s |  |  feature-vectored-io passed  |
   | +1 :green_heart: |  shadedclient  |  25m 52s |  |  branch has no errors when building and testing our client artifacts.  |
   |||| _ Patch Compile Tests _ |
   | +1 :green_heart: |  mvninstall  |   1m  4s |  |  the patch passed  |
   | +1 :green_heart: |  compile  |  24m 14s |  |  the patch passed with JDK Private Build-11.0.15+10-Ubuntu-0ubuntu0.20.04.1  |
   | +1 :green_heart: |  javac  |  24m 14s |  |  the patch passed  |
   | +1 :green_heart: |  compile  |  21m 34s |  |  the patch passed with JDK Private Build-1.8.0_312-8u312-b07-0ubuntu1~20.04-b07  |
   | +1 :green_heart: |  javac  |  21m 34s |  |  the patch passed  |
   | +1 :green_heart: |  blanks  |   0m  0s |  |  The patch has no blanks issues.  |
   | -0 :warning: |  checkstyle  |   1m 24s | [/results-checkstyle-hadoop-common-project_hadoop-common.txt](https://ci-hadoop.apache.org/job/hadoop-multibranch/job/PR-4263/4/artifact/out/results-checkstyle-hadoop-common-project_hadoop-common.txt) |  hadoop-common-project/hadoop-common: The patch generated 1 new + 1 unchanged - 0 fixed = 2 total (was 1)  |
   | +1 :green_heart: |  mvnsite  |   1m 56s |  |  the patch passed  |
   | -1 :x: |  javadoc  |   1m 28s | [/patch-javadoc-hadoop-common-project_hadoop-common-jdkPrivateBuild-11.0.15+10-Ubuntu-0ubuntu0.20.04.1.txt](https://ci-hadoop.apache.org/job/hadoop-multibranch/job/PR-4263/4/artifact/out/patch-javadoc-hadoop-common-project_hadoop-common-jdkPrivateBuild-11.0.15+10-Ubuntu-0ubuntu0.20.04.1.txt) |  hadoop-common in the patch failed with JDK Private Build-11.0.15+10-Ubuntu-0ubuntu0.20.04.1.  |
   | +1 :green_heart: |  javadoc  |   1m 58s |  |  the patch passed with JDK Private Build-1.8.0_312-8u312-b07-0ubuntu1~20.04-b07  |
   | +1 :green_heart: |  spotbugs  |   3m  3s |  |  the patch passed  |
   | +1 :green_heart: |  shadedclient  |  26m 12s |  |  patch has no errors when building and testing our client artifacts.  |
   |||| _ Other Tests _ |
   | +1 :green_heart: |  unit  |  18m 20s |  |  hadoop-common in the patch passed.  |
   | +1 :green_heart: |  asflicense  |   1m 16s |  |  The patch does not generate ASF License warnings.  |
   |  |   | 228m 51s |  |  |
   
   
   | Subsystem | Report/Notes |
   |----------:|:-------------|
   | Docker | ClientAPI=1.41 ServerAPI=1.41 base: https://ci-hadoop.apache.org/job/hadoop-multibranch/job/PR-4263/4/artifact/out/Dockerfile |
   | GITHUB PR | https://github.com/apache/hadoop/pull/4263 |
   | Optional Tests | dupname asflicense compile javac javadoc mvninstall mvnsite unit shadedclient spotbugs checkstyle codespell |
   | uname | Linux ece9748bf6c8 4.15.0-175-generic #184-Ubuntu SMP Thu Mar 24 17:48:36 UTC 2022 x86_64 x86_64 x86_64 GNU/Linux |
   | Build tool | maven |
   | Personality | dev-support/bin/hadoop.sh |
   | git revision | feature-vectored-io / 35652fda1ed903aeaa78fda84b049420cbc470e5 |
   | Default Java | Private Build-1.8.0_312-8u312-b07-0ubuntu1~20.04-b07 |
   | Multi-JDK versions | /usr/lib/jvm/java-11-openjdk-amd64:Private Build-11.0.15+10-Ubuntu-0ubuntu0.20.04.1 /usr/lib/jvm/java-8-openjdk-amd64:Private Build-1.8.0_312-8u312-b07-0ubuntu1~20.04-b07 |
   |  Test Results | https://ci-hadoop.apache.org/job/hadoop-multibranch/job/PR-4263/4/testReport/ |
   | Max. process+thread count | 3137 (vs. ulimit of 5500) |
   | modules | C: hadoop-common-project/hadoop-common U: hadoop-common-project/hadoop-common |
   | Console output | https://ci-hadoop.apache.org/job/hadoop-multibranch/job/PR-4263/4/console |
   | versions | git=2.25.1 maven=3.6.3 spotbugs=4.2.2 |
   | Powered by | Apache Yetus 0.14.0-SNAPSHOT https://yetus.apache.org |
   
   
   This message was automatically generated.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-issues-help@hadoop.apache.org


[GitHub] [hadoop] mukund-thakur commented on a diff in pull request #4263: HADOOP-18105 Implement buffer pooling with weak references

Posted by GitBox <gi...@apache.org>.
mukund-thakur commented on code in PR #4263:
URL: https://github.com/apache/hadoop/pull/4263#discussion_r882046723


##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestMoreWeakReferencedElasticByteBufferPool.java:
##########
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io;
+
+import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+
+import org.apache.hadoop.test.HadoopTestBase;
+
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+/**
+ * Non parameterized tests for {@code WeakReferencedElasticByteBufferPool}.
+ */
+public class TestMoreWeakReferencedElasticByteBufferPool

Review Comment:
   Null buffers? Is that a valid case. I can add a precondition on the incoming buffer and throw NPE with a message. Right now yes it fails with NPE. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-issues-help@hadoop.apache.org


[GitHub] [hadoop] steveloughran commented on a diff in pull request #4263: HADOOP-18105 Implement buffer pooling with weak references

Posted by GitBox <gi...@apache.org>.
steveloughran commented on code in PR #4263:
URL: https://github.com/apache/hadoop/pull/4263#discussion_r880947333


##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/WeakReferencedElasticByteBufferPool.java:
##########
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io;
+
+import java.lang.ref.WeakReference;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+
+/**
+ * Buffer pool implementation which uses weak references to store
+ * buffers in the pool, such that they are garbage collected when
+ * there are no references to the buffer during a gc run. This is
+ * important as direct buffers don't get garbage collected automatically
+ * during a gc run as they are not stored on heap memory.
+ * Also the buffers are stored in a tree map which helps in returning
+ * smallest buffer whose size is just greater than requested length.
+ * This is a thread safe implementation.
+ */
+public final class WeakReferencedElasticByteBufferPool extends ElasticByteBufferPool {
+
+  private final TreeMap<Key, WeakReference<ByteBuffer>> directBuffers =

Review Comment:
   1. add javadocs here and below, mention use must be in synchronized blocks
   2. field should be of type Map<>, unless it has to be explicitly a tree map



##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ByteBufferPool.java:
##########
@@ -45,4 +45,6 @@ public interface ByteBufferPool {
    * @param buffer    a direct bytebuffer
    */
   void putBuffer(ByteBuffer buffer);
+
+  default void release() { }

Review Comment:
   javadoc?



##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/WeakReferencedElasticByteBufferPool.java:
##########
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io;
+
+import java.lang.ref.WeakReference;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+
+/**
+ * Buffer pool implementation which uses weak references to store
+ * buffers in the pool, such that they are garbage collected when
+ * there are no references to the buffer during a gc run. This is
+ * important as direct buffer don't get garbage collected automatically
+ * during a gc run as they are not stored on heap memory.
+ * Also the buffers are stored in a tree map which helps in returning
+ * smallest buffer whose size is just greater than requested length.
+ * This is a thread safe implementation.
+ */
+public final class WeakReferencedElasticByteBufferPool extends ElasticByteBufferPool {
+
+  private final TreeMap<Key, WeakReference<ByteBuffer>> directBuffers =
+          new TreeMap<>();
+
+  private final TreeMap<Key, WeakReference<ByteBuffer>> heapBuffers =
+          new TreeMap<>();
+
+  private TreeMap<Key, WeakReference<ByteBuffer>> getBufferTree(boolean isDirect) {
+    return isDirect ? directBuffers : heapBuffers;
+  }
+
+  /**
+   * {@inheritDoc}
+   *
+   * @param direct whether we want a direct byte buffer or a heap one.
+   * @param length length of requested buffer.
+   * @return returns equal or next greater than capacity buffer from
+   * pool if already available and not garbage collected else creates
+   * a new buffer and return it.
+   */
+  @Override
+  public synchronized ByteBuffer getBuffer(boolean direct, int length) {
+    TreeMap<Key, WeakReference<ByteBuffer>> buffersTree = getBufferTree(direct);
+
+    // Scan the entire tree and remove all weak null references.
+    buffersTree.entrySet().removeIf(next -> next.getValue().get() == null);
+
+    Map.Entry<Key, WeakReference<ByteBuffer>> entry =
+            buffersTree.ceilingEntry(new Key(length, 0));
+    // If there is no buffer present in the pool with desired size.
+    if (entry == null) {
+      return direct ? ByteBuffer.allocateDirect(length) :
+                      ByteBuffer.allocate(length);
+    }
+    // buffer is available in the pool and not garbage collected.
+    WeakReference<ByteBuffer> bufferInPool = entry.getValue();
+    buffersTree.remove(entry.getKey());
+    ByteBuffer buffer = bufferInPool.get();
+    if (buffer != null) {
+      return buffer;
+    }
+    // buffer was in pool but already got garbage collected.
+    return direct ? ByteBuffer.allocateDirect(length) :

Review Comment:
   sorry, i meant
   ```java
   return direct
   	? ByteBuffer.allocateDirect(length)
   	: ByteBuffer.allocate(length);
   ```
   
   put the ? and : first to maximise visibility
   



##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/WeakReferencedElasticByteBufferPool.java:
##########
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io;
+
+import java.lang.ref.WeakReference;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+
+/**
+ * Buffer pool implementation which uses weak references to store
+ * buffers in the pool, such that they are garbage collected when
+ * there are no references to the buffer during a gc run. This is
+ * important as direct buffers don't get garbage collected automatically
+ * during a gc run as they are not stored on heap memory.
+ * Also the buffers are stored in a tree map which helps in returning
+ * smallest buffer whose size is just greater than requested length.
+ * This is a thread safe implementation.
+ */
+public final class WeakReferencedElasticByteBufferPool extends ElasticByteBufferPool {

Review Comment:
   tag ag as @Private @Unstable 



##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestMoreWeakReferencedElasticByteBufferPool.java:
##########
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io;
+
+import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+
+import org.apache.hadoop.test.HadoopTestBase;
+
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+/**
+ * Non parameterized tests for {@code WeakReferencedElasticByteBufferPool}.
+ */
+public class TestMoreWeakReferencedElasticByteBufferPool

Review Comment:
   add a test for
   * returning a buffer which isn't in the pool
   * null buffer (expecting NPE somewhere, presumaby)



##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/WeakReferencedElasticByteBufferPool.java:
##########
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io;
+
+import java.lang.ref.WeakReference;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+
+/**
+ * Buffer pool implementation which uses weak references to store
+ * buffers in the pool, such that they are garbage collected when
+ * there are no references to the buffer during a gc run. This is
+ * important as direct buffers don't get garbage collected automatically
+ * during a gc run as they are not stored on heap memory.
+ * Also the buffers are stored in a tree map which helps in returning
+ * smallest buffer whose size is just greater than requested length.
+ * This is a thread safe implementation.
+ */
+public final class WeakReferencedElasticByteBufferPool extends ElasticByteBufferPool {
+
+  private final TreeMap<Key, WeakReference<ByteBuffer>> directBuffers =
+          new TreeMap<>();
+
+  private final TreeMap<Key, WeakReference<ByteBuffer>> heapBuffers =
+          new TreeMap<>();
+
+  private TreeMap<Key, WeakReference<ByteBuffer>> getBufferTree(boolean isDirect) {
+    return isDirect
+            ? directBuffers
+            : heapBuffers;
+  }
+
+  /**
+   * {@inheritDoc}
+   *
+   * @param direct whether we want a direct byte buffer or a heap one.
+   * @param length length of requested buffer.
+   * @return returns equal or next greater than capacity buffer from
+   * pool if already available and not garbage collected else creates
+   * a new buffer and return it.
+   */
+  @Override
+  public synchronized ByteBuffer getBuffer(boolean direct, int length) {
+    TreeMap<Key, WeakReference<ByteBuffer>> buffersTree = getBufferTree(direct);
+
+    // Scan the entire tree and remove all weak null references.
+    buffersTree.entrySet().removeIf(next -> next.getValue().get() == null);
+
+    Map.Entry<Key, WeakReference<ByteBuffer>> entry =
+            buffersTree.ceilingEntry(new Key(length, 0));
+    // If there is no buffer present in the pool with desired size.
+    if (entry == null) {
+      return direct ? ByteBuffer.allocateDirect(length) :
+                      ByteBuffer.allocate(length);
+    }
+    // buffer is available in the pool and not garbage collected.
+    WeakReference<ByteBuffer> bufferInPool = entry.getValue();
+    buffersTree.remove(entry.getKey());
+    ByteBuffer buffer = bufferInPool.get();
+    if (buffer != null) {
+      return buffer;
+    }
+    // buffer was in pool but already got garbage collected.
+    return direct ? ByteBuffer.allocateDirect(length) :
+            ByteBuffer.allocate(length);
+  }
+
+  /**
+   * Return buffer to the pool.
+   * @param buffer buffer to be returned.
+   */
+  @Override
+  public synchronized void putBuffer(ByteBuffer buffer) {
+    buffer.clear();
+    TreeMap<Key, WeakReference<ByteBuffer>> buffersTree = getBufferTree(buffer.isDirect());
+    // Buffers are indexed by (capacity, time).
+    // If our key is not unique on the first try, we try again, since the
+    // time will be different.  Since we use nanoseconds, it's pretty
+    // unlikely that we'll loop even once, unless the system clock has a
+    // poor granularity or multi-socket systems have clocks slightly out
+    // of sync.
+    while (true) {
+      Key keyToInsert = new Key(buffer.capacity(), System.nanoTime());
+      if (!buffersTree.containsKey(keyToInsert)) {
+        buffersTree.put(keyToInsert, new WeakReference<>(buffer));
+        return;
+      }
+    }
+  }
+
+  /**
+   * Clear the buffer pool thus releasing all the buffers.
+   * The caller must remove all references of
+   * existing buffers before calling this method to avoid
+   * memory leaks.
+   */
+  @Override
+  public synchronized void release() {
+    heapBuffers.clear();
+    directBuffers.clear();
+  }
+
+  @VisibleForTesting
+  public int getCurrentBuffersCount(boolean isDirect) {
+    return isDirect ? directBuffers.size() : heapBuffers.size();

Review Comment:
   * same nit about splitting at ? : boundaries
   * does this need to be synchronized?
   * javadocs



##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/WeakReferencedElasticByteBufferPool.java:
##########
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io;
+
+import java.lang.ref.WeakReference;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+
+/**
+ * Buffer pool implementation which uses weak references to store
+ * buffers in the pool, such that they are garbage collected when
+ * there are no references to the buffer during a gc run. This is
+ * important as direct buffers don't get garbage collected automatically
+ * during a gc run as they are not stored on heap memory.
+ * Also the buffers are stored in a tree map which helps in returning
+ * smallest buffer whose size is just greater than requested length.
+ * This is a thread safe implementation.
+ */
+public final class WeakReferencedElasticByteBufferPool extends ElasticByteBufferPool {
+
+  private final TreeMap<Key, WeakReference<ByteBuffer>> directBuffers =
+          new TreeMap<>();
+
+  private final TreeMap<Key, WeakReference<ByteBuffer>> heapBuffers =
+          new TreeMap<>();
+
+  private TreeMap<Key, WeakReference<ByteBuffer>> getBufferTree(boolean isDirect) {
+    return isDirect
+            ? directBuffers
+            : heapBuffers;
+  }
+
+  /**
+   * {@inheritDoc}
+   *
+   * @param direct whether we want a direct byte buffer or a heap one.
+   * @param length length of requested buffer.
+   * @return returns equal or next greater than capacity buffer from
+   * pool if already available and not garbage collected else creates
+   * a new buffer and return it.
+   */
+  @Override
+  public synchronized ByteBuffer getBuffer(boolean direct, int length) {
+    TreeMap<Key, WeakReference<ByteBuffer>> buffersTree = getBufferTree(direct);
+
+    // Scan the entire tree and remove all weak null references.
+    buffersTree.entrySet().removeIf(next -> next.getValue().get() == null);
+
+    Map.Entry<Key, WeakReference<ByteBuffer>> entry =
+            buffersTree.ceilingEntry(new Key(length, 0));
+    // If there is no buffer present in the pool with desired size.
+    if (entry == null) {
+      return direct ? ByteBuffer.allocateDirect(length) :
+                      ByteBuffer.allocate(length);
+    }
+    // buffer is available in the pool and not garbage collected.
+    WeakReference<ByteBuffer> bufferInPool = entry.getValue();
+    buffersTree.remove(entry.getKey());
+    ByteBuffer buffer = bufferInPool.get();
+    if (buffer != null) {
+      return buffer;
+    }
+    // buffer was in pool but already got garbage collected.
+    return direct ? ByteBuffer.allocateDirect(length) :
+            ByteBuffer.allocate(length);
+  }
+
+  /**
+   * Return buffer to the pool.
+   * @param buffer buffer to be returned.
+   */
+  @Override
+  public synchronized void putBuffer(ByteBuffer buffer) {
+    buffer.clear();
+    TreeMap<Key, WeakReference<ByteBuffer>> buffersTree = getBufferTree(buffer.isDirect());
+    // Buffers are indexed by (capacity, time).
+    // If our key is not unique on the first try, we try again, since the
+    // time will be different.  Since we use nanoseconds, it's pretty
+    // unlikely that we'll loop even once, unless the system clock has a
+    // poor granularity or multi-socket systems have clocks slightly out
+    // of sync.
+    while (true) {
+      Key keyToInsert = new Key(buffer.capacity(), System.nanoTime());
+      if (!buffersTree.containsKey(keyToInsert)) {
+        buffersTree.put(keyToInsert, new WeakReference<>(buffer));
+        return;
+      }
+    }

Review Comment:
   what if the buffer wasn't foiund. log at warning? LogExactlyOnce, maybe?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-issues-help@hadoop.apache.org


[GitHub] [hadoop] mukund-thakur commented on a diff in pull request #4263: HADOOP-18105 Implement buffer pooling with weak references

Posted by GitBox <gi...@apache.org>.
mukund-thakur commented on code in PR #4263:
URL: https://github.com/apache/hadoop/pull/4263#discussion_r882004680


##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/WeakReferencedElasticByteBufferPool.java:
##########
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io;
+
+import java.lang.ref.WeakReference;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+
+/**
+ * Buffer pool implementation which uses weak references to store
+ * buffers in the pool, such that they are garbage collected when
+ * there are no references to the buffer during a gc run. This is
+ * important as direct buffers don't get garbage collected automatically
+ * during a gc run as they are not stored on heap memory.
+ * Also the buffers are stored in a tree map which helps in returning
+ * smallest buffer whose size is just greater than requested length.
+ * This is a thread safe implementation.
+ */
+public final class WeakReferencedElasticByteBufferPool extends ElasticByteBufferPool {
+
+  private final TreeMap<Key, WeakReference<ByteBuffer>> directBuffers =
+          new TreeMap<>();
+
+  private final TreeMap<Key, WeakReference<ByteBuffer>> heapBuffers =
+          new TreeMap<>();
+
+  private TreeMap<Key, WeakReference<ByteBuffer>> getBufferTree(boolean isDirect) {
+    return isDirect
+            ? directBuffers
+            : heapBuffers;
+  }
+
+  /**
+   * {@inheritDoc}
+   *
+   * @param direct whether we want a direct byte buffer or a heap one.
+   * @param length length of requested buffer.
+   * @return returns equal or next greater than capacity buffer from
+   * pool if already available and not garbage collected else creates
+   * a new buffer and return it.
+   */
+  @Override
+  public synchronized ByteBuffer getBuffer(boolean direct, int length) {
+    TreeMap<Key, WeakReference<ByteBuffer>> buffersTree = getBufferTree(direct);
+
+    // Scan the entire tree and remove all weak null references.
+    buffersTree.entrySet().removeIf(next -> next.getValue().get() == null);
+
+    Map.Entry<Key, WeakReference<ByteBuffer>> entry =
+            buffersTree.ceilingEntry(new Key(length, 0));
+    // If there is no buffer present in the pool with desired size.
+    if (entry == null) {
+      return direct ? ByteBuffer.allocateDirect(length) :
+                      ByteBuffer.allocate(length);
+    }
+    // buffer is available in the pool and not garbage collected.
+    WeakReference<ByteBuffer> bufferInPool = entry.getValue();
+    buffersTree.remove(entry.getKey());
+    ByteBuffer buffer = bufferInPool.get();
+    if (buffer != null) {
+      return buffer;
+    }
+    // buffer was in pool but already got garbage collected.
+    return direct ? ByteBuffer.allocateDirect(length) :
+            ByteBuffer.allocate(length);
+  }
+
+  /**
+   * Return buffer to the pool.
+   * @param buffer buffer to be returned.
+   */
+  @Override
+  public synchronized void putBuffer(ByteBuffer buffer) {
+    buffer.clear();
+    TreeMap<Key, WeakReference<ByteBuffer>> buffersTree = getBufferTree(buffer.isDirect());
+    // Buffers are indexed by (capacity, time).
+    // If our key is not unique on the first try, we try again, since the
+    // time will be different.  Since we use nanoseconds, it's pretty
+    // unlikely that we'll loop even once, unless the system clock has a
+    // poor granularity or multi-socket systems have clocks slightly out
+    // of sync.
+    while (true) {
+      Key keyToInsert = new Key(buffer.capacity(), System.nanoTime());
+      if (!buffersTree.containsKey(keyToInsert)) {
+        buffersTree.put(keyToInsert, new WeakReference<>(buffer));
+        return;
+      }
+    }
+  }
+
+  /**
+   * Clear the buffer pool thus releasing all the buffers.
+   * The caller must remove all references of
+   * existing buffers before calling this method to avoid
+   * memory leaks.
+   */
+  @Override
+  public synchronized void release() {
+    heapBuffers.clear();
+    directBuffers.clear();
+  }
+
+  @VisibleForTesting
+  public int getCurrentBuffersCount(boolean isDirect) {
+    return isDirect ? directBuffers.size() : heapBuffers.size();

Review Comment:
   This is only for testing but still made this synchronized. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-issues-help@hadoop.apache.org


[GitHub] [hadoop] hadoop-yetus commented on pull request #4263: HADOOP-18105 Implement buffer pooling with weak references

Posted by GitBox <gi...@apache.org>.
hadoop-yetus commented on PR #4263:
URL: https://github.com/apache/hadoop/pull/4263#issuecomment-1142808796

   :broken_heart: **-1 overall**
   
   
   
   
   
   
   | Vote | Subsystem | Runtime |  Logfile | Comment |
   |:----:|----------:|--------:|:--------:|:-------:|
   | +0 :ok: |  reexec  |   0m 56s |  |  Docker mode activated.  |
   |||| _ Prechecks _ |
   | +1 :green_heart: |  dupname  |   0m  0s |  |  No case conflicting files found.  |
   | +0 :ok: |  codespell  |   0m  0s |  |  codespell was not available.  |
   | +1 :green_heart: |  @author  |   0m  0s |  |  The patch does not contain any @author tags.  |
   | +1 :green_heart: |  test4tests  |   0m  0s |  |  The patch appears to include 2 new or modified test files.  |
   |||| _ feature-vectored-io Compile Tests _ |
   | +1 :green_heart: |  mvninstall  |  40m 25s |  |  feature-vectored-io passed  |
   | +1 :green_heart: |  compile  |  25m 10s |  |  feature-vectored-io passed with JDK Private Build-11.0.15+10-Ubuntu-0ubuntu0.20.04.1  |
   | +1 :green_heart: |  compile  |  21m 42s |  |  feature-vectored-io passed with JDK Private Build-1.8.0_312-8u312-b07-0ubuntu1~20.04-b07  |
   | +1 :green_heart: |  checkstyle  |   1m 31s |  |  feature-vectored-io passed  |
   | +1 :green_heart: |  mvnsite  |   1m 59s |  |  feature-vectored-io passed  |
   | -1 :x: |  javadoc  |   1m 37s | [/branch-javadoc-hadoop-common-project_hadoop-common-jdkPrivateBuild-11.0.15+10-Ubuntu-0ubuntu0.20.04.1.txt](https://ci-hadoop.apache.org/job/hadoop-multibranch/job/PR-4263/5/artifact/out/branch-javadoc-hadoop-common-project_hadoop-common-jdkPrivateBuild-11.0.15+10-Ubuntu-0ubuntu0.20.04.1.txt) |  hadoop-common in feature-vectored-io failed with JDK Private Build-11.0.15+10-Ubuntu-0ubuntu0.20.04.1.  |
   | +1 :green_heart: |  javadoc  |   2m  1s |  |  feature-vectored-io passed with JDK Private Build-1.8.0_312-8u312-b07-0ubuntu1~20.04-b07  |
   | +1 :green_heart: |  spotbugs  |   3m  5s |  |  feature-vectored-io passed  |
   | +1 :green_heart: |  shadedclient  |  26m  5s |  |  branch has no errors when building and testing our client artifacts.  |
   |||| _ Patch Compile Tests _ |
   | +1 :green_heart: |  mvninstall  |   1m  5s |  |  the patch passed  |
   | +1 :green_heart: |  compile  |  24m 12s |  |  the patch passed with JDK Private Build-11.0.15+10-Ubuntu-0ubuntu0.20.04.1  |
   | +1 :green_heart: |  javac  |  24m 12s |  |  the patch passed  |
   | +1 :green_heart: |  compile  |  21m 33s |  |  the patch passed with JDK Private Build-1.8.0_312-8u312-b07-0ubuntu1~20.04-b07  |
   | +1 :green_heart: |  javac  |  21m 33s |  |  the patch passed  |
   | +1 :green_heart: |  blanks  |   0m  0s |  |  The patch has no blanks issues.  |
   | +1 :green_heart: |  checkstyle  |   1m 25s |  |  the patch passed  |
   | +1 :green_heart: |  mvnsite  |   1m 56s |  |  the patch passed  |
   | -1 :x: |  javadoc  |   1m 24s | [/patch-javadoc-hadoop-common-project_hadoop-common-jdkPrivateBuild-11.0.15+10-Ubuntu-0ubuntu0.20.04.1.txt](https://ci-hadoop.apache.org/job/hadoop-multibranch/job/PR-4263/5/artifact/out/patch-javadoc-hadoop-common-project_hadoop-common-jdkPrivateBuild-11.0.15+10-Ubuntu-0ubuntu0.20.04.1.txt) |  hadoop-common in the patch failed with JDK Private Build-11.0.15+10-Ubuntu-0ubuntu0.20.04.1.  |
   | +1 :green_heart: |  javadoc  |   1m 58s |  |  the patch passed with JDK Private Build-1.8.0_312-8u312-b07-0ubuntu1~20.04-b07  |
   | +1 :green_heart: |  spotbugs  |   3m  2s |  |  the patch passed  |
   | +1 :green_heart: |  shadedclient  |  25m 43s |  |  patch has no errors when building and testing our client artifacts.  |
   |||| _ Other Tests _ |
   | +1 :green_heart: |  unit  |  18m 16s |  |  hadoop-common in the patch passed.  |
   | +1 :green_heart: |  asflicense  |   1m 17s |  |  The patch does not generate ASF License warnings.  |
   |  |   | 226m 37s |  |  |
   
   
   | Subsystem | Report/Notes |
   |----------:|:-------------|
   | Docker | ClientAPI=1.41 ServerAPI=1.41 base: https://ci-hadoop.apache.org/job/hadoop-multibranch/job/PR-4263/5/artifact/out/Dockerfile |
   | GITHUB PR | https://github.com/apache/hadoop/pull/4263 |
   | Optional Tests | dupname asflicense compile javac javadoc mvninstall mvnsite unit shadedclient spotbugs checkstyle codespell |
   | uname | Linux e73fb3e76e1a 4.15.0-175-generic #184-Ubuntu SMP Thu Mar 24 17:48:36 UTC 2022 x86_64 x86_64 x86_64 GNU/Linux |
   | Build tool | maven |
   | Personality | dev-support/bin/hadoop.sh |
   | git revision | feature-vectored-io / 24ac304a1f4cabfa0ddda64d0ab9a785e696ee0f |
   | Default Java | Private Build-1.8.0_312-8u312-b07-0ubuntu1~20.04-b07 |
   | Multi-JDK versions | /usr/lib/jvm/java-11-openjdk-amd64:Private Build-11.0.15+10-Ubuntu-0ubuntu0.20.04.1 /usr/lib/jvm/java-8-openjdk-amd64:Private Build-1.8.0_312-8u312-b07-0ubuntu1~20.04-b07 |
   |  Test Results | https://ci-hadoop.apache.org/job/hadoop-multibranch/job/PR-4263/5/testReport/ |
   | Max. process+thread count | 1236 (vs. ulimit of 5500) |
   | modules | C: hadoop-common-project/hadoop-common U: hadoop-common-project/hadoop-common |
   | Console output | https://ci-hadoop.apache.org/job/hadoop-multibranch/job/PR-4263/5/console |
   | versions | git=2.25.1 maven=3.6.3 spotbugs=4.2.2 |
   | Powered by | Apache Yetus 0.14.0-SNAPSHOT https://yetus.apache.org |
   
   
   This message was automatically generated.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-issues-help@hadoop.apache.org


[GitHub] [hadoop] steveloughran commented on pull request #4263: HADOOP-18105 Implement buffer pooling with weak references

Posted by GitBox <gi...@apache.org>.
steveloughran commented on PR #4263:
URL: https://github.com/apache/hadoop/pull/4263#issuecomment-1123466433

   there are some active patches for javadoc; don't worry too much for now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-issues-help@hadoop.apache.org


[GitHub] [hadoop] steveloughran commented on a diff in pull request #4263: HADOOP-18105 Implement buffer pooling with weak references

Posted by GitBox <gi...@apache.org>.
steveloughran commented on code in PR #4263:
URL: https://github.com/apache/hadoop/pull/4263#discussion_r875762848


##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestMoreWeakReferencedElasticByteBufferPool.java:
##########
@@ -52,6 +55,32 @@ public void testMixedBuffersInPool() {
 
   }
 
+  @Test
+  public void testUnexpectedBufferSizes() {
+    WeakReferencedElasticByteBufferPool pool = new WeakReferencedElasticByteBufferPool();
+    ByteBuffer buffer1 = pool.getBuffer(true, 0);
+    try {

Review Comment:
   use LambdaTestUtils.intercept() here and below



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-issues-help@hadoop.apache.org


[GitHub] [hadoop] mukund-thakur commented on pull request #4263: HADOOP-18105 Implement buffer pooling with weak references

Posted by GitBox <gi...@apache.org>.
mukund-thakur commented on PR #4263:
URL: https://github.com/apache/hadoop/pull/4263#issuecomment-1122830097

   Don't know why javadoc is playing up in all my patches. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-issues-help@hadoop.apache.org


[GitHub] [hadoop] steveloughran commented on a diff in pull request #4263: HADOOP-18105 Implement buffer pooling with weak references

Posted by GitBox <gi...@apache.org>.
steveloughran commented on code in PR #4263:
URL: https://github.com/apache/hadoop/pull/4263#discussion_r870105560


##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/WeakReferencedElasticByteBufferPool.java:
##########
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io;
+
+import java.lang.ref.WeakReference;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+
+/**
+ * Buffer pool implementation which uses weak references to store
+ * buffers in the pool, such that they are garbage collected when
+ * there are no references to the buffer during a gc run. This is
+ * important as direct buffer don't get garbage collected automatically

Review Comment:
   "direct buffers"



##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/WeakReferencedElasticByteBufferPool.java:
##########
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io;
+
+import java.lang.ref.WeakReference;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+
+/**
+ * Buffer pool implementation which uses weak references to store
+ * buffers in the pool, such that they are garbage collected when
+ * there are no references to the buffer during a gc run. This is
+ * important as direct buffer don't get garbage collected automatically
+ * during a gc run as they are not stored on heap memory.
+ * Also the buffers are stored in a tree map which helps in returning
+ * smallest buffer whose size is just greater than requested length.
+ * This is a thread safe implementation.
+ */
+public final class WeakReferencedElasticByteBufferPool extends ElasticByteBufferPool {
+
+  private final TreeMap<Key, WeakReference<ByteBuffer>> directBuffers =
+          new TreeMap<>();
+
+  private final TreeMap<Key, WeakReference<ByteBuffer>> heapBuffers =
+          new TreeMap<>();
+
+  private TreeMap<Key, WeakReference<ByteBuffer>> getBufferTree(boolean isDirect) {
+    return isDirect ? directBuffers : heapBuffers;
+  }
+
+  /**
+   * {@inheritDoc}
+   *
+   * @param direct whether we want a direct byte buffer or a heap one.
+   * @param length length of requested buffer.
+   * @return returns equal or next greater than capacity buffer from
+   * pool if already available and not garbage collected else creates
+   * a new buffer and return it.
+   */
+  @Override
+  public synchronized ByteBuffer getBuffer(boolean direct, int length) {
+    TreeMap<Key, WeakReference<ByteBuffer>> buffersTree = getBufferTree(direct);
+
+    // Scan the entire tree and remove all weak null references.
+    buffersTree.entrySet().removeIf(next -> next.getValue().get() == null);
+
+    Map.Entry<Key, WeakReference<ByteBuffer>> entry =
+            buffersTree.ceilingEntry(new Key(length, 0));
+    // If there is no buffer present in the pool with desired size.
+    if (entry == null) {
+      return direct ? ByteBuffer.allocateDirect(length) :
+                      ByteBuffer.allocate(length);
+    }
+    // buffer is available in the pool and not garbage collected.
+    WeakReference<ByteBuffer> bufferInPool = entry.getValue();
+    buffersTree.remove(entry.getKey());
+    ByteBuffer buffer = bufferInPool.get();
+    if (buffer != null) {
+      return buffer;
+    }
+    // buffer was in pool but already got garbage collected.
+    return direct ? ByteBuffer.allocateDirect(length) :
+            ByteBuffer.allocate(length);
+  }
+
+  /**
+   * Return buffer to the pool.
+   * @param buffer buffer to be returned.
+   */
+  @Override
+  public synchronized void putBuffer(ByteBuffer buffer) {
+    buffer.clear();
+    TreeMap<Key, WeakReference<ByteBuffer>> buffersTree = getBufferTree(buffer.isDirect());
+    // Buffers are indexed by (capacity, time).
+    // If our key is not unique on the first try, we try again, since the
+    // time will be different.  Since we use nanoseconds, it's pretty
+    // unlikely that we'll loop even once, unless the system clock has a
+    // poor granularity.
+    while (true) {
+      Key keyToInsert = new Key(buffer.capacity(), System.nanoTime());
+      if (!buffersTree.containsKey(keyToInsert)) {
+        buffersTree.put(keyToInsert, new WeakReference<>(buffer));
+        return;
+      }
+    }
+  }
+
+  /**
+   * Clear the buffer pool thus releasing all the buffers.
+   * The caller must remove all references of
+   * existing buffers before calling this method to avoid
+   * memory leaks.
+   */
+  @Override
+  public void release() {

Review Comment:
   synchronized?



##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/WeakReferencedElasticByteBufferPool.java:
##########
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io;
+
+import java.lang.ref.WeakReference;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+
+/**
+ * Buffer pool implementation which uses weak references to store
+ * buffers in the pool, such that they are garbage collected when
+ * there are no references to the buffer during a gc run. This is
+ * important as direct buffer don't get garbage collected automatically
+ * during a gc run as they are not stored on heap memory.
+ * Also the buffers are stored in a tree map which helps in returning
+ * smallest buffer whose size is just greater than requested length.
+ * This is a thread safe implementation.
+ */
+public final class WeakReferencedElasticByteBufferPool extends ElasticByteBufferPool {
+
+  private final TreeMap<Key, WeakReference<ByteBuffer>> directBuffers =
+          new TreeMap<>();
+
+  private final TreeMap<Key, WeakReference<ByteBuffer>> heapBuffers =
+          new TreeMap<>();
+
+  private TreeMap<Key, WeakReference<ByteBuffer>> getBufferTree(boolean isDirect) {
+    return isDirect ? directBuffers : heapBuffers;
+  }
+
+  /**
+   * {@inheritDoc}
+   *
+   * @param direct whether we want a direct byte buffer or a heap one.
+   * @param length length of requested buffer.
+   * @return returns equal or next greater than capacity buffer from
+   * pool if already available and not garbage collected else creates
+   * a new buffer and return it.
+   */
+  @Override
+  public synchronized ByteBuffer getBuffer(boolean direct, int length) {
+    TreeMap<Key, WeakReference<ByteBuffer>> buffersTree = getBufferTree(direct);
+
+    // Scan the entire tree and remove all weak null references.
+    buffersTree.entrySet().removeIf(next -> next.getValue().get() == null);
+
+    Map.Entry<Key, WeakReference<ByteBuffer>> entry =
+            buffersTree.ceilingEntry(new Key(length, 0));
+    // If there is no buffer present in the pool with desired size.
+    if (entry == null) {
+      return direct ? ByteBuffer.allocateDirect(length) :
+                      ByteBuffer.allocate(length);
+    }
+    // buffer is available in the pool and not garbage collected.
+    WeakReference<ByteBuffer> bufferInPool = entry.getValue();
+    buffersTree.remove(entry.getKey());
+    ByteBuffer buffer = bufferInPool.get();
+    if (buffer != null) {
+      return buffer;
+    }
+    // buffer was in pool but already got garbage collected.
+    return direct ? ByteBuffer.allocateDirect(length) :

Review Comment:
   nit, can you put the ? and : clauses on different lines to make them more visible.



##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/WeakReferencedElasticByteBufferPool.java:
##########
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io;
+
+import java.lang.ref.WeakReference;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+
+/**
+ * Buffer pool implementation which uses weak references to store
+ * buffers in the pool, such that they are garbage collected when
+ * there are no references to the buffer during a gc run. This is
+ * important as direct buffer don't get garbage collected automatically
+ * during a gc run as they are not stored on heap memory.
+ * Also the buffers are stored in a tree map which helps in returning
+ * smallest buffer whose size is just greater than requested length.
+ * This is a thread safe implementation.
+ */
+public final class WeakReferencedElasticByteBufferPool extends ElasticByteBufferPool {
+
+  private final TreeMap<Key, WeakReference<ByteBuffer>> directBuffers =
+          new TreeMap<>();
+
+  private final TreeMap<Key, WeakReference<ByteBuffer>> heapBuffers =
+          new TreeMap<>();
+
+  private TreeMap<Key, WeakReference<ByteBuffer>> getBufferTree(boolean isDirect) {
+    return isDirect ? directBuffers : heapBuffers;
+  }
+
+  /**
+   * {@inheritDoc}
+   *
+   * @param direct whether we want a direct byte buffer or a heap one.
+   * @param length length of requested buffer.
+   * @return returns equal or next greater than capacity buffer from
+   * pool if already available and not garbage collected else creates
+   * a new buffer and return it.
+   */
+  @Override
+  public synchronized ByteBuffer getBuffer(boolean direct, int length) {
+    TreeMap<Key, WeakReference<ByteBuffer>> buffersTree = getBufferTree(direct);
+
+    // Scan the entire tree and remove all weak null references.
+    buffersTree.entrySet().removeIf(next -> next.getValue().get() == null);
+
+    Map.Entry<Key, WeakReference<ByteBuffer>> entry =
+            buffersTree.ceilingEntry(new Key(length, 0));
+    // If there is no buffer present in the pool with desired size.
+    if (entry == null) {
+      return direct ? ByteBuffer.allocateDirect(length) :
+                      ByteBuffer.allocate(length);
+    }
+    // buffer is available in the pool and not garbage collected.
+    WeakReference<ByteBuffer> bufferInPool = entry.getValue();
+    buffersTree.remove(entry.getKey());
+    ByteBuffer buffer = bufferInPool.get();
+    if (buffer != null) {
+      return buffer;
+    }
+    // buffer was in pool but already got garbage collected.
+    return direct ? ByteBuffer.allocateDirect(length) :
+            ByteBuffer.allocate(length);
+  }
+
+  /**
+   * Return buffer to the pool.
+   * @param buffer buffer to be returned.
+   */
+  @Override
+  public synchronized void putBuffer(ByteBuffer buffer) {
+    buffer.clear();
+    TreeMap<Key, WeakReference<ByteBuffer>> buffersTree = getBufferTree(buffer.isDirect());
+    // Buffers are indexed by (capacity, time).
+    // If our key is not unique on the first try, we try again, since the
+    // time will be different.  Since we use nanoseconds, it's pretty
+    // unlikely that we'll loop even once, unless the system clock has a
+    // poor granularity.

Review Comment:
   or multisocket systems have clocks slightly out of sync.



##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestMoreWeakReferencedElasticByteBufferPool.java:
##########
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io;
+
+import java.nio.ByteBuffer;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+
+/**
+ * Non parameterized tests for {@code WeakReferencedElasticByteBufferPool}.
+ */
+public class TestMoreWeakReferencedElasticByteBufferPool {
+
+  @Test
+  public void testMixedBuffersInPool() {
+    WeakReferencedElasticByteBufferPool pool = new WeakReferencedElasticByteBufferPool();
+    ByteBuffer buffer1 = pool.getBuffer(true, 5);
+    ByteBuffer buffer2 = pool.getBuffer(true, 10);
+    ByteBuffer buffer3 = pool.getBuffer(false, 5);
+    ByteBuffer buffer4 = pool.getBuffer(false, 10);
+    ByteBuffer buffer5 = pool.getBuffer(true, 15);
+
+    assertBufferCounts(pool, 0, 0);
+    pool.putBuffer(buffer1);
+    pool.putBuffer(buffer2);
+    assertBufferCounts(pool, 2, 0);
+    pool.putBuffer(buffer3);
+    assertBufferCounts(pool, 2, 1);
+    pool.putBuffer(buffer5);
+    assertBufferCounts(pool, 3, 1);
+    pool.putBuffer(buffer4);
+    assertBufferCounts(pool, 3, 2);
+    pool.release();
+    assertBufferCounts(pool, 0, 0);
+
+  }
+
+  private void assertBufferCounts(WeakReferencedElasticByteBufferPool pool,

Review Comment:
   can you add a javadoc?



##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestWeakReferencedElasticByteBufferPool.java:
##########
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/**
+ * Unit tests for {@code WeakReferencedElasticByteBufferPool}.
+ */
+@RunWith(Parameterized.class)
+public class TestWeakReferencedElasticByteBufferPool {
+
+  private final boolean isDirect;
+
+  private final String type;
+
+  @Parameterized.Parameters(name = "Buffer type : {0}")
+  public static List<String> params() {
+    return Arrays.asList("direct", "array");
+  }
+
+  public TestWeakReferencedElasticByteBufferPool(String type) {
+    this.type = type;
+    this.isDirect = !"array".equals(type);
+  }
+
+  // Add more tests for different time and same size buffers in the pool. 
+  @Test
+  public void testGetAndPutBasic() {
+    WeakReferencedElasticByteBufferPool pool = new WeakReferencedElasticByteBufferPool();
+    int bufferSize = 5;
+    ByteBuffer buffer = pool.getBuffer(isDirect, bufferSize);
+    Assertions.assertThat(buffer.isDirect())
+            .describedAs("Buffered returned should be of correct type {}", type)
+            .isEqualTo(isDirect);
+    Assertions.assertThat(buffer.capacity())
+            .describedAs("Initial capacity of returned buffer from pool")
+            .isEqualTo(bufferSize);
+    Assertions.assertThat(buffer.position())
+            .describedAs("Initial position of returned buffer from pool")
+            .isEqualTo(0);
+
+    byte[] arr = createByteArray(bufferSize);
+    buffer.put(arr, 0, arr.length);
+    buffer.flip();
+    validateBufferContent(buffer, arr);
+    Assertions.assertThat(buffer.position())
+            .describedAs("Buffer's position after filling bytes in it")
+            .isEqualTo(bufferSize);
+    // releasing buffer to the pool.
+    pool.putBuffer(buffer);
+    Assertions.assertThat(buffer.position())
+            .describedAs("Position should be reset to 0 after returning buffer to the pool")
+            .isEqualTo(0);
+
+  }
+
+  @Test
+  public void testPoolingWithDifferentSizes() {
+    WeakReferencedElasticByteBufferPool pool = new WeakReferencedElasticByteBufferPool();
+    ByteBuffer buffer = pool.getBuffer(isDirect, 5);
+    ByteBuffer buffer1 = pool.getBuffer(isDirect, 10);
+    ByteBuffer buffer2 = pool.getBuffer(isDirect, 15);
+
+    Assertions.assertThat(pool.getCurrentBuffersCount(isDirect))
+            .describedAs("Number of buffers in the pool")
+            .isEqualTo(0);
+
+    pool.putBuffer(buffer1);
+    pool.putBuffer(buffer2);
+    Assertions.assertThat(pool.getCurrentBuffersCount(isDirect))
+            .describedAs("Number of buffers in the pool")
+            .isEqualTo(2);
+    ByteBuffer buffer3 = pool.getBuffer(isDirect, 12);
+    Assertions.assertThat(buffer3.capacity())
+            .describedAs("Pooled buffer should have older capacity")
+            .isEqualTo(15);
+    Assertions.assertThat(pool.getCurrentBuffersCount(isDirect))
+            .describedAs("Number of buffers in the pool")
+            .isEqualTo(1);
+    pool.putBuffer(buffer);
+    ByteBuffer buffer4 = pool.getBuffer(isDirect, 6);
+    Assertions.assertThat(buffer4.capacity())
+            .describedAs("Pooled buffer should have older capacity")
+            .isEqualTo(10);
+    Assertions.assertThat(pool.getCurrentBuffersCount(isDirect))
+            .describedAs("Number of buffers in the pool")
+            .isEqualTo(1);
+
+    pool.release();
+    Assertions.assertThat(pool.getCurrentBuffersCount(isDirect))
+            .describedAs("Number of buffers in the pool post release")
+            .isEqualTo(0);
+  }
+
+  @Test
+  public void testPoolingWithDifferentInsertionTime() {
+    WeakReferencedElasticByteBufferPool pool = new WeakReferencedElasticByteBufferPool();
+    ByteBuffer buffer = pool.getBuffer(isDirect, 10);
+    ByteBuffer buffer1 = pool.getBuffer(isDirect, 10);
+    ByteBuffer buffer2 = pool.getBuffer(isDirect, 10);
+
+    Assertions.assertThat(pool.getCurrentBuffersCount(isDirect))
+            .describedAs("Number of buffers in the pool")
+            .isEqualTo(0);
+
+    pool.putBuffer(buffer1);
+    pool.putBuffer(buffer2);
+    Assertions.assertThat(pool.getCurrentBuffersCount(isDirect))
+            .describedAs("Number of buffers in the pool")
+            .isEqualTo(2);
+    ByteBuffer buffer3 = pool.getBuffer(isDirect, 10);
+    // As buffer1 is returned to the pool before buffer2, it should
+    // be returned when buffer of same size is asked again from
+    // the pool.
+    Assertions.assertThat(buffer3 == buffer1)
+            .describedAs("Buffers should be returned in order of their " +
+                    "insertion time")
+            .isTrue();
+    pool.putBuffer(buffer);
+    ByteBuffer buffer4 = pool.getBuffer(isDirect, 10);
+    Assertions.assertThat(buffer4 == buffer2)

Review Comment:
   use equality test



##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestWeakReferencedElasticByteBufferPool.java:
##########
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/**
+ * Unit tests for {@code WeakReferencedElasticByteBufferPool}.
+ */
+@RunWith(Parameterized.class)
+public class TestWeakReferencedElasticByteBufferPool {
+
+  private final boolean isDirect;
+
+  private final String type;
+
+  @Parameterized.Parameters(name = "Buffer type : {0}")
+  public static List<String> params() {
+    return Arrays.asList("direct", "array");
+  }
+
+  public TestWeakReferencedElasticByteBufferPool(String type) {
+    this.type = type;
+    this.isDirect = !"array".equals(type);
+  }
+
+  // Add more tests for different time and same size buffers in the pool. 
+  @Test
+  public void testGetAndPutBasic() {
+    WeakReferencedElasticByteBufferPool pool = new WeakReferencedElasticByteBufferPool();
+    int bufferSize = 5;
+    ByteBuffer buffer = pool.getBuffer(isDirect, bufferSize);
+    Assertions.assertThat(buffer.isDirect())
+            .describedAs("Buffered returned should be of correct type {}", type)
+            .isEqualTo(isDirect);
+    Assertions.assertThat(buffer.capacity())
+            .describedAs("Initial capacity of returned buffer from pool")
+            .isEqualTo(bufferSize);
+    Assertions.assertThat(buffer.position())
+            .describedAs("Initial position of returned buffer from pool")
+            .isEqualTo(0);
+
+    byte[] arr = createByteArray(bufferSize);
+    buffer.put(arr, 0, arr.length);
+    buffer.flip();
+    validateBufferContent(buffer, arr);
+    Assertions.assertThat(buffer.position())
+            .describedAs("Buffer's position after filling bytes in it")
+            .isEqualTo(bufferSize);
+    // releasing buffer to the pool.
+    pool.putBuffer(buffer);
+    Assertions.assertThat(buffer.position())
+            .describedAs("Position should be reset to 0 after returning buffer to the pool")
+            .isEqualTo(0);
+
+  }
+
+  @Test
+  public void testPoolingWithDifferentSizes() {
+    WeakReferencedElasticByteBufferPool pool = new WeakReferencedElasticByteBufferPool();
+    ByteBuffer buffer = pool.getBuffer(isDirect, 5);
+    ByteBuffer buffer1 = pool.getBuffer(isDirect, 10);
+    ByteBuffer buffer2 = pool.getBuffer(isDirect, 15);
+
+    Assertions.assertThat(pool.getCurrentBuffersCount(isDirect))
+            .describedAs("Number of buffers in the pool")
+            .isEqualTo(0);
+
+    pool.putBuffer(buffer1);
+    pool.putBuffer(buffer2);
+    Assertions.assertThat(pool.getCurrentBuffersCount(isDirect))
+            .describedAs("Number of buffers in the pool")
+            .isEqualTo(2);
+    ByteBuffer buffer3 = pool.getBuffer(isDirect, 12);
+    Assertions.assertThat(buffer3.capacity())
+            .describedAs("Pooled buffer should have older capacity")
+            .isEqualTo(15);
+    Assertions.assertThat(pool.getCurrentBuffersCount(isDirect))
+            .describedAs("Number of buffers in the pool")
+            .isEqualTo(1);
+    pool.putBuffer(buffer);
+    ByteBuffer buffer4 = pool.getBuffer(isDirect, 6);
+    Assertions.assertThat(buffer4.capacity())
+            .describedAs("Pooled buffer should have older capacity")
+            .isEqualTo(10);
+    Assertions.assertThat(pool.getCurrentBuffersCount(isDirect))
+            .describedAs("Number of buffers in the pool")
+            .isEqualTo(1);
+
+    pool.release();
+    Assertions.assertThat(pool.getCurrentBuffersCount(isDirect))
+            .describedAs("Number of buffers in the pool post release")
+            .isEqualTo(0);
+  }
+
+  @Test
+  public void testPoolingWithDifferentInsertionTime() {
+    WeakReferencedElasticByteBufferPool pool = new WeakReferencedElasticByteBufferPool();
+    ByteBuffer buffer = pool.getBuffer(isDirect, 10);
+    ByteBuffer buffer1 = pool.getBuffer(isDirect, 10);
+    ByteBuffer buffer2 = pool.getBuffer(isDirect, 10);
+
+    Assertions.assertThat(pool.getCurrentBuffersCount(isDirect))
+            .describedAs("Number of buffers in the pool")
+            .isEqualTo(0);
+
+    pool.putBuffer(buffer1);
+    pool.putBuffer(buffer2);
+    Assertions.assertThat(pool.getCurrentBuffersCount(isDirect))
+            .describedAs("Number of buffers in the pool")
+            .isEqualTo(2);
+    ByteBuffer buffer3 = pool.getBuffer(isDirect, 10);
+    // As buffer1 is returned to the pool before buffer2, it should
+    // be returned when buffer of same size is asked again from
+    // the pool.
+    Assertions.assertThat(buffer3 == buffer1)
+            .describedAs("Buffers should be returned in order of their " +
+                    "insertion time")
+            .isTrue();
+    pool.putBuffer(buffer);
+    ByteBuffer buffer4 = pool.getBuffer(isDirect, 10);
+    Assertions.assertThat(buffer4 == buffer2)
+            .describedAs("Buffers should be returned in order of their " +
+                    "insertion time")
+            .isTrue();
+  }
+
+  @Test
+  public void testGarbageCollection() {
+    WeakReferencedElasticByteBufferPool pool = new WeakReferencedElasticByteBufferPool();
+    ByteBuffer buffer = pool.getBuffer(isDirect, 5);
+    ByteBuffer buffer1 = pool.getBuffer(isDirect, 10);
+    ByteBuffer buffer2 = pool.getBuffer(isDirect, 15);
+    Assertions.assertThat(pool.getCurrentBuffersCount(isDirect))
+            .describedAs("Number of buffers in the pool")
+            .isEqualTo(0);
+    pool.putBuffer(buffer1);
+    pool.putBuffer(buffer2);
+    Assertions.assertThat(pool.getCurrentBuffersCount(isDirect))
+            .describedAs("Number of buffers in the pool")
+            .isEqualTo(2);
+    // Before GC.
+    ByteBuffer buffer4 = pool.getBuffer(isDirect, 12);
+    Assertions.assertThat(buffer4.capacity())
+            .describedAs("Pooled buffer should have older capacity")
+            .isEqualTo(15);
+
+    // Removing the references
+    buffer1 = null;
+    buffer2 = null;
+    System.gc();

Review Comment:
   this doesn't reliably trigger a GC; my weak ref test had to create lots of memory use before it worked.



##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestWeakReferencedElasticByteBufferPool.java:
##########
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/**
+ * Unit tests for {@code WeakReferencedElasticByteBufferPool}.
+ */
+@RunWith(Parameterized.class)
+public class TestWeakReferencedElasticByteBufferPool {
+
+  private final boolean isDirect;
+
+  private final String type;
+
+  @Parameterized.Parameters(name = "Buffer type : {0}")
+  public static List<String> params() {
+    return Arrays.asList("direct", "array");
+  }
+
+  public TestWeakReferencedElasticByteBufferPool(String type) {
+    this.type = type;
+    this.isDirect = !"array".equals(type);
+  }
+
+  // Add more tests for different time and same size buffers in the pool. 
+  @Test
+  public void testGetAndPutBasic() {
+    WeakReferencedElasticByteBufferPool pool = new WeakReferencedElasticByteBufferPool();
+    int bufferSize = 5;
+    ByteBuffer buffer = pool.getBuffer(isDirect, bufferSize);
+    Assertions.assertThat(buffer.isDirect())
+            .describedAs("Buffered returned should be of correct type {}", type)
+            .isEqualTo(isDirect);
+    Assertions.assertThat(buffer.capacity())
+            .describedAs("Initial capacity of returned buffer from pool")
+            .isEqualTo(bufferSize);
+    Assertions.assertThat(buffer.position())
+            .describedAs("Initial position of returned buffer from pool")
+            .isEqualTo(0);
+
+    byte[] arr = createByteArray(bufferSize);
+    buffer.put(arr, 0, arr.length);
+    buffer.flip();
+    validateBufferContent(buffer, arr);
+    Assertions.assertThat(buffer.position())
+            .describedAs("Buffer's position after filling bytes in it")
+            .isEqualTo(bufferSize);
+    // releasing buffer to the pool.
+    pool.putBuffer(buffer);
+    Assertions.assertThat(buffer.position())
+            .describedAs("Position should be reset to 0 after returning buffer to the pool")
+            .isEqualTo(0);
+
+  }
+
+  @Test
+  public void testPoolingWithDifferentSizes() {
+    WeakReferencedElasticByteBufferPool pool = new WeakReferencedElasticByteBufferPool();
+    ByteBuffer buffer = pool.getBuffer(isDirect, 5);
+    ByteBuffer buffer1 = pool.getBuffer(isDirect, 10);
+    ByteBuffer buffer2 = pool.getBuffer(isDirect, 15);
+
+    Assertions.assertThat(pool.getCurrentBuffersCount(isDirect))
+            .describedAs("Number of buffers in the pool")
+            .isEqualTo(0);
+
+    pool.putBuffer(buffer1);
+    pool.putBuffer(buffer2);
+    Assertions.assertThat(pool.getCurrentBuffersCount(isDirect))
+            .describedAs("Number of buffers in the pool")
+            .isEqualTo(2);
+    ByteBuffer buffer3 = pool.getBuffer(isDirect, 12);
+    Assertions.assertThat(buffer3.capacity())
+            .describedAs("Pooled buffer should have older capacity")
+            .isEqualTo(15);
+    Assertions.assertThat(pool.getCurrentBuffersCount(isDirect))
+            .describedAs("Number of buffers in the pool")
+            .isEqualTo(1);
+    pool.putBuffer(buffer);
+    ByteBuffer buffer4 = pool.getBuffer(isDirect, 6);
+    Assertions.assertThat(buffer4.capacity())
+            .describedAs("Pooled buffer should have older capacity")
+            .isEqualTo(10);
+    Assertions.assertThat(pool.getCurrentBuffersCount(isDirect))
+            .describedAs("Number of buffers in the pool")
+            .isEqualTo(1);
+
+    pool.release();
+    Assertions.assertThat(pool.getCurrentBuffersCount(isDirect))
+            .describedAs("Number of buffers in the pool post release")
+            .isEqualTo(0);
+  }
+
+  @Test
+  public void testPoolingWithDifferentInsertionTime() {
+    WeakReferencedElasticByteBufferPool pool = new WeakReferencedElasticByteBufferPool();
+    ByteBuffer buffer = pool.getBuffer(isDirect, 10);
+    ByteBuffer buffer1 = pool.getBuffer(isDirect, 10);
+    ByteBuffer buffer2 = pool.getBuffer(isDirect, 10);
+
+    Assertions.assertThat(pool.getCurrentBuffersCount(isDirect))
+            .describedAs("Number of buffers in the pool")
+            .isEqualTo(0);
+
+    pool.putBuffer(buffer1);
+    pool.putBuffer(buffer2);
+    Assertions.assertThat(pool.getCurrentBuffersCount(isDirect))
+            .describedAs("Number of buffers in the pool")
+            .isEqualTo(2);
+    ByteBuffer buffer3 = pool.getBuffer(isDirect, 10);
+    // As buffer1 is returned to the pool before buffer2, it should
+    // be returned when buffer of same size is asked again from
+    // the pool.
+    Assertions.assertThat(buffer3 == buffer1)

Review Comment:
   use equality test



##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestMoreWeakReferencedElasticByteBufferPool.java:
##########
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io;
+
+import java.nio.ByteBuffer;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+
+/**
+ * Non parameterized tests for {@code WeakReferencedElasticByteBufferPool}.
+ */
+public class TestMoreWeakReferencedElasticByteBufferPool {

Review Comment:
   1. extend HadoopTestBase with its timeout, thread name etc
   2. add a test where you ask for 0 bytes
   3. add a test where you ask for -1 bytes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-issues-help@hadoop.apache.org


[GitHub] [hadoop] steveloughran commented on a diff in pull request #4263: HADOOP-18105 Implement buffer pooling with weak references

Posted by GitBox <gi...@apache.org>.
steveloughran commented on code in PR #4263:
URL: https://github.com/apache/hadoop/pull/4263#discussion_r884681833


##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestMoreWeakReferencedElasticByteBufferPool.java:
##########
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io;
+
+import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+
+import org.apache.hadoop.test.HadoopTestBase;
+
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+/**
+ * Non parameterized tests for {@code WeakReferencedElasticByteBufferPool}.
+ */
+public class TestMoreWeakReferencedElasticByteBufferPool

Review Comment:
   as long as it fails in the same method where it is passed in, I'm happy. it's the delayed failures which are the pain point



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-issues-help@hadoop.apache.org


[GitHub] [hadoop] mukund-thakur commented on a diff in pull request #4263: HADOOP-18105 Implement buffer pooling with weak references

Posted by GitBox <gi...@apache.org>.
mukund-thakur commented on code in PR #4263:
URL: https://github.com/apache/hadoop/pull/4263#discussion_r875305357


##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestWeakReferencedElasticByteBufferPool.java:
##########
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/**
+ * Unit tests for {@code WeakReferencedElasticByteBufferPool}.
+ */
+@RunWith(Parameterized.class)
+public class TestWeakReferencedElasticByteBufferPool {
+
+  private final boolean isDirect;
+
+  private final String type;
+
+  @Parameterized.Parameters(name = "Buffer type : {0}")
+  public static List<String> params() {
+    return Arrays.asList("direct", "array");
+  }
+
+  public TestWeakReferencedElasticByteBufferPool(String type) {
+    this.type = type;
+    this.isDirect = !"array".equals(type);
+  }
+
+  // Add more tests for different time and same size buffers in the pool. 
+  @Test
+  public void testGetAndPutBasic() {
+    WeakReferencedElasticByteBufferPool pool = new WeakReferencedElasticByteBufferPool();
+    int bufferSize = 5;
+    ByteBuffer buffer = pool.getBuffer(isDirect, bufferSize);
+    Assertions.assertThat(buffer.isDirect())
+            .describedAs("Buffered returned should be of correct type {}", type)
+            .isEqualTo(isDirect);
+    Assertions.assertThat(buffer.capacity())
+            .describedAs("Initial capacity of returned buffer from pool")
+            .isEqualTo(bufferSize);
+    Assertions.assertThat(buffer.position())
+            .describedAs("Initial position of returned buffer from pool")
+            .isEqualTo(0);
+
+    byte[] arr = createByteArray(bufferSize);
+    buffer.put(arr, 0, arr.length);
+    buffer.flip();
+    validateBufferContent(buffer, arr);
+    Assertions.assertThat(buffer.position())
+            .describedAs("Buffer's position after filling bytes in it")
+            .isEqualTo(bufferSize);
+    // releasing buffer to the pool.
+    pool.putBuffer(buffer);
+    Assertions.assertThat(buffer.position())
+            .describedAs("Position should be reset to 0 after returning buffer to the pool")
+            .isEqualTo(0);
+
+  }
+
+  @Test
+  public void testPoolingWithDifferentSizes() {
+    WeakReferencedElasticByteBufferPool pool = new WeakReferencedElasticByteBufferPool();
+    ByteBuffer buffer = pool.getBuffer(isDirect, 5);
+    ByteBuffer buffer1 = pool.getBuffer(isDirect, 10);
+    ByteBuffer buffer2 = pool.getBuffer(isDirect, 15);
+
+    Assertions.assertThat(pool.getCurrentBuffersCount(isDirect))
+            .describedAs("Number of buffers in the pool")
+            .isEqualTo(0);
+
+    pool.putBuffer(buffer1);
+    pool.putBuffer(buffer2);
+    Assertions.assertThat(pool.getCurrentBuffersCount(isDirect))
+            .describedAs("Number of buffers in the pool")
+            .isEqualTo(2);
+    ByteBuffer buffer3 = pool.getBuffer(isDirect, 12);
+    Assertions.assertThat(buffer3.capacity())
+            .describedAs("Pooled buffer should have older capacity")
+            .isEqualTo(15);
+    Assertions.assertThat(pool.getCurrentBuffersCount(isDirect))
+            .describedAs("Number of buffers in the pool")
+            .isEqualTo(1);
+    pool.putBuffer(buffer);
+    ByteBuffer buffer4 = pool.getBuffer(isDirect, 6);
+    Assertions.assertThat(buffer4.capacity())
+            .describedAs("Pooled buffer should have older capacity")
+            .isEqualTo(10);
+    Assertions.assertThat(pool.getCurrentBuffersCount(isDirect))
+            .describedAs("Number of buffers in the pool")
+            .isEqualTo(1);
+
+    pool.release();
+    Assertions.assertThat(pool.getCurrentBuffersCount(isDirect))
+            .describedAs("Number of buffers in the pool post release")
+            .isEqualTo(0);
+  }
+
+  @Test
+  public void testPoolingWithDifferentInsertionTime() {
+    WeakReferencedElasticByteBufferPool pool = new WeakReferencedElasticByteBufferPool();
+    ByteBuffer buffer = pool.getBuffer(isDirect, 10);
+    ByteBuffer buffer1 = pool.getBuffer(isDirect, 10);
+    ByteBuffer buffer2 = pool.getBuffer(isDirect, 10);
+
+    Assertions.assertThat(pool.getCurrentBuffersCount(isDirect))
+            .describedAs("Number of buffers in the pool")
+            .isEqualTo(0);
+
+    pool.putBuffer(buffer1);
+    pool.putBuffer(buffer2);
+    Assertions.assertThat(pool.getCurrentBuffersCount(isDirect))
+            .describedAs("Number of buffers in the pool")
+            .isEqualTo(2);
+    ByteBuffer buffer3 = pool.getBuffer(isDirect, 10);
+    // As buffer1 is returned to the pool before buffer2, it should
+    // be returned when buffer of same size is asked again from
+    // the pool.
+    Assertions.assertThat(buffer3 == buffer1)

Review Comment:
   this was done explicitly here such that memory reference of both buffer should match as well. Just equal to will only match the content. 
   Anyway found isSameAs which can do what is required here. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-issues-help@hadoop.apache.org


[GitHub] [hadoop] steveloughran commented on a diff in pull request #4263: HADOOP-18105 Implement buffer pooling with weak references

Posted by GitBox <gi...@apache.org>.
steveloughran commented on code in PR #4263:
URL: https://github.com/apache/hadoop/pull/4263#discussion_r875762072


##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestWeakReferencedElasticByteBufferPool.java:
##########
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/**
+ * Unit tests for {@code WeakReferencedElasticByteBufferPool}.
+ */
+@RunWith(Parameterized.class)
+public class TestWeakReferencedElasticByteBufferPool {
+
+  private final boolean isDirect;
+
+  private final String type;
+
+  @Parameterized.Parameters(name = "Buffer type : {0}")
+  public static List<String> params() {
+    return Arrays.asList("direct", "array");
+  }
+
+  public TestWeakReferencedElasticByteBufferPool(String type) {
+    this.type = type;
+    this.isDirect = !"array".equals(type);
+  }
+
+  // Add more tests for different time and same size buffers in the pool. 
+  @Test
+  public void testGetAndPutBasic() {
+    WeakReferencedElasticByteBufferPool pool = new WeakReferencedElasticByteBufferPool();
+    int bufferSize = 5;
+    ByteBuffer buffer = pool.getBuffer(isDirect, bufferSize);
+    Assertions.assertThat(buffer.isDirect())
+            .describedAs("Buffered returned should be of correct type {}", type)
+            .isEqualTo(isDirect);
+    Assertions.assertThat(buffer.capacity())
+            .describedAs("Initial capacity of returned buffer from pool")
+            .isEqualTo(bufferSize);
+    Assertions.assertThat(buffer.position())
+            .describedAs("Initial position of returned buffer from pool")
+            .isEqualTo(0);
+
+    byte[] arr = createByteArray(bufferSize);
+    buffer.put(arr, 0, arr.length);
+    buffer.flip();
+    validateBufferContent(buffer, arr);
+    Assertions.assertThat(buffer.position())
+            .describedAs("Buffer's position after filling bytes in it")
+            .isEqualTo(bufferSize);
+    // releasing buffer to the pool.
+    pool.putBuffer(buffer);
+    Assertions.assertThat(buffer.position())
+            .describedAs("Position should be reset to 0 after returning buffer to the pool")
+            .isEqualTo(0);
+
+  }
+
+  @Test
+  public void testPoolingWithDifferentSizes() {
+    WeakReferencedElasticByteBufferPool pool = new WeakReferencedElasticByteBufferPool();
+    ByteBuffer buffer = pool.getBuffer(isDirect, 5);
+    ByteBuffer buffer1 = pool.getBuffer(isDirect, 10);
+    ByteBuffer buffer2 = pool.getBuffer(isDirect, 15);
+
+    Assertions.assertThat(pool.getCurrentBuffersCount(isDirect))
+            .describedAs("Number of buffers in the pool")
+            .isEqualTo(0);
+
+    pool.putBuffer(buffer1);
+    pool.putBuffer(buffer2);
+    Assertions.assertThat(pool.getCurrentBuffersCount(isDirect))
+            .describedAs("Number of buffers in the pool")
+            .isEqualTo(2);
+    ByteBuffer buffer3 = pool.getBuffer(isDirect, 12);
+    Assertions.assertThat(buffer3.capacity())
+            .describedAs("Pooled buffer should have older capacity")
+            .isEqualTo(15);
+    Assertions.assertThat(pool.getCurrentBuffersCount(isDirect))
+            .describedAs("Number of buffers in the pool")
+            .isEqualTo(1);
+    pool.putBuffer(buffer);
+    ByteBuffer buffer4 = pool.getBuffer(isDirect, 6);
+    Assertions.assertThat(buffer4.capacity())
+            .describedAs("Pooled buffer should have older capacity")
+            .isEqualTo(10);
+    Assertions.assertThat(pool.getCurrentBuffersCount(isDirect))
+            .describedAs("Number of buffers in the pool")
+            .isEqualTo(1);
+
+    pool.release();
+    Assertions.assertThat(pool.getCurrentBuffersCount(isDirect))
+            .describedAs("Number of buffers in the pool post release")
+            .isEqualTo(0);
+  }
+
+  @Test
+  public void testPoolingWithDifferentInsertionTime() {
+    WeakReferencedElasticByteBufferPool pool = new WeakReferencedElasticByteBufferPool();
+    ByteBuffer buffer = pool.getBuffer(isDirect, 10);
+    ByteBuffer buffer1 = pool.getBuffer(isDirect, 10);
+    ByteBuffer buffer2 = pool.getBuffer(isDirect, 10);
+
+    Assertions.assertThat(pool.getCurrentBuffersCount(isDirect))
+            .describedAs("Number of buffers in the pool")
+            .isEqualTo(0);
+
+    pool.putBuffer(buffer1);
+    pool.putBuffer(buffer2);
+    Assertions.assertThat(pool.getCurrentBuffersCount(isDirect))
+            .describedAs("Number of buffers in the pool")
+            .isEqualTo(2);
+    ByteBuffer buffer3 = pool.getBuffer(isDirect, 10);
+    // As buffer1 is returned to the pool before buffer2, it should
+    // be returned when buffer of same size is asked again from
+    // the pool.
+    Assertions.assertThat(buffer3 == buffer1)

Review Comment:
   i see. and yes, i missed that == was object ref equality. which is silly as *nobody ever uses this intentionally*



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-issues-help@hadoop.apache.org


[GitHub] [hadoop] mukund-thakur commented on a diff in pull request #4263: HADOOP-18105 Implement buffer pooling with weak references

Posted by GitBox <gi...@apache.org>.
mukund-thakur commented on code in PR #4263:
URL: https://github.com/apache/hadoop/pull/4263#discussion_r881996475


##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/WeakReferencedElasticByteBufferPool.java:
##########
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io;
+
+import java.lang.ref.WeakReference;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+
+/**
+ * Buffer pool implementation which uses weak references to store
+ * buffers in the pool, such that they are garbage collected when
+ * there are no references to the buffer during a gc run. This is
+ * important as direct buffers don't get garbage collected automatically
+ * during a gc run as they are not stored on heap memory.
+ * Also the buffers are stored in a tree map which helps in returning
+ * smallest buffer whose size is just greater than requested length.
+ * This is a thread safe implementation.
+ */
+public final class WeakReferencedElasticByteBufferPool extends ElasticByteBufferPool {
+
+  private final TreeMap<Key, WeakReference<ByteBuffer>> directBuffers =

Review Comment:
   We need tree map as we need to figure out the next greater than capacity buffer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-issues-help@hadoop.apache.org


[GitHub] [hadoop] mukund-thakur commented on pull request #4263: HADOOP-18105 Implement buffer pooling with weak references

Posted by GitBox <gi...@apache.org>.
mukund-thakur commented on PR #4263:
URL: https://github.com/apache/hadoop/pull/4263#issuecomment-1137774197

   > deciding how to handle an unknown buffer being returned is the key one
   What do you mean by this? I don't think there is a way to know whether the buffer being returned currently through putBuffer() was part of this pool or not. 
   Right now the puBuffer() call succeeds if I create a random buffer directly in my tests. 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-issues-help@hadoop.apache.org


[GitHub] [hadoop] steveloughran commented on pull request #4263: HADOOP-18105 Implement buffer pooling with weak references

Posted by GitBox <gi...@apache.org>.
steveloughran commented on PR #4263:
URL: https://github.com/apache/hadoop/pull/4263#issuecomment-1140990996

   > Right now the puBuffer() call succeeds if I create a random buffer directly in my tests.
   
   ok. we treat that as a success. just add a javadoc warning of this and say "may change in future"


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-issues-help@hadoop.apache.org