You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2018/06/24 02:27:48 UTC

[kafka] branch 1.1 updated: Revert "MINOR: Avoid coarse lock in Pool#getAndMaybePut (#5258)"

This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.1 by this push:
     new 39d28f9  Revert "MINOR: Avoid coarse lock in Pool#getAndMaybePut (#5258)"
39d28f9 is described below

commit 39d28f99f2fd5a265e9f5e9b8308eb01e12643c7
Author: Ismael Juma <is...@juma.me.uk>
AuthorDate: Sat Jun 23 19:27:00 2018 -0700

    Revert "MINOR: Avoid coarse lock in Pool#getAndMaybePut (#5258)"
    
    It requires Java 8 and 1.1 still supports Java 7.
    
    This reverts commit dc3acb065a62972b43988b6a26f1b85c12e648ad.
---
 core/src/main/scala/kafka/utils/Pool.scala | 20 ++++++++++++++++----
 1 file changed, 16 insertions(+), 4 deletions(-)

diff --git a/core/src/main/scala/kafka/utils/Pool.scala b/core/src/main/scala/kafka/utils/Pool.scala
index f19dd98..4ddf557 100644
--- a/core/src/main/scala/kafka/utils/Pool.scala
+++ b/core/src/main/scala/kafka/utils/Pool.scala
@@ -26,6 +26,7 @@ import kafka.common.KafkaException
 class Pool[K,V](valueFactory: Option[K => V] = None) extends Iterable[(K, V)] {
 
   private val pool: ConcurrentMap[K, V] = new ConcurrentHashMap[K, V]
+  private val createLock = new Object
   
   def put(k: K, v: V): V = pool.put(k, v)
   
@@ -55,10 +56,21 @@ class Pool[K,V](valueFactory: Option[K => V] = None) extends Iterable[(K, V)] {
     * @param createValue Factory function.
     * @return The final value associated with the key.
     */
-  def getAndMaybePut(key: K, createValue: => V): V =
-    pool.computeIfAbsent(key, new java.util.function.Function[K, V] {
-      override def apply(k: K): V = createValue
-    })
+  def getAndMaybePut(key: K, createValue: => V): V = {
+    val current = pool.get(key)
+    if (current == null) {
+      createLock synchronized {
+        val current = pool.get(key)
+        if (current == null) {
+          val value = createValue
+          pool.put(key, value)
+          value
+        }
+        else current
+      }
+    }
+    else current
+  }
 
   def contains(id: K): Boolean = pool.containsKey(id)