You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/11/10 09:44:47 UTC

kafka git commit: MINOR: Use Scala Future in CoreUtils test

Repository: kafka
Updated Branches:
  refs/heads/trunk 14e3ed048 -> b9039bbd4


MINOR: Use Scala Future in CoreUtils test

Also rename UtilsTest to CoreUtilsTest and note
that `getOrElseUpdate` has the right behaviour
in Scala 2.12.

Author: Ismael Juma <is...@juma.me.uk>

Reviewers: Rajini Sivaram <ra...@googlemail.com>

Closes #4142 from ijuma/use-scala-futures-in-core-utils-test


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b9039bbd
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b9039bbd
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b9039bbd

Branch: refs/heads/trunk
Commit: b9039bbd4b3d8eb231c9f2d1d40042706f8f4f19
Parents: 14e3ed0
Author: Ismael Juma <is...@juma.me.uk>
Authored: Fri Nov 10 09:44:43 2017 +0000
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Fri Nov 10 09:44:43 2017 +0000

----------------------------------------------------------------------
 core/src/main/scala/kafka/utils/CoreUtils.scala |   4 +
 .../scala/unit/kafka/utils/CoreUtilsTest.scala  | 210 ++++++++++++++++++
 .../test/scala/unit/kafka/utils/UtilsTest.scala | 213 -------------------
 3 files changed, 214 insertions(+), 213 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b9039bbd/core/src/main/scala/kafka/utils/CoreUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/CoreUtils.scala b/core/src/main/scala/kafka/utils/CoreUtils.scala
index b776406..7a853d5 100755
--- a/core/src/main/scala/kafka/utils/CoreUtils.scala
+++ b/core/src/main/scala/kafka/utils/CoreUtils.scala
@@ -309,6 +309,10 @@ object CoreUtils extends Logging {
    * keys often exist in the map, avoiding the need to create a new value. `createValue`
    * may be invoked more than once if multiple threads attempt to insert a key at the same
    * time, but the same inserted value will be returned to all threads.
+   *
+   * In Scala 2.12, `ConcurrentMap.getOrElse` has the same behaviour as this method, but that
+   * is not the case in Scala 2.11. We can remove this method once we drop support for Scala
+   * 2.11.
    */
   def atomicGetOrUpdate[K, V](map: concurrent.Map[K, V], key: K, createValue: => V): V = {
     map.get(key) match {

http://git-wip-us.apache.org/repos/asf/kafka/blob/b9039bbd/core/src/test/scala/unit/kafka/utils/CoreUtilsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/CoreUtilsTest.scala b/core/src/test/scala/unit/kafka/utils/CoreUtilsTest.scala
new file mode 100755
index 0000000..5e607be
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/utils/CoreUtilsTest.scala
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.utils
+
+import java.util.{Arrays, UUID}
+import java.util.concurrent.{ConcurrentHashMap, Executors, TimeUnit}
+import java.util.concurrent.atomic.AtomicInteger
+import java.util.concurrent.locks.ReentrantLock
+import java.nio.ByteBuffer
+import java.util.regex.Pattern
+
+import org.apache.log4j.Logger
+import org.scalatest.junit.JUnitSuite
+import org.junit.Assert._
+import kafka.common.KafkaException
+import kafka.utils.CoreUtils.inLock
+import org.junit.Test
+import org.apache.kafka.common.utils.{Base64, Utils}
+
+import scala.collection.JavaConverters._
+import scala.concurrent.duration.Duration
+import scala.concurrent.{Await, ExecutionContext, Future}
+
+class CoreUtilsTest extends JUnitSuite {
+
+  private val logger = Logger.getLogger(classOf[CoreUtilsTest])
+  val clusterIdPattern = Pattern.compile("[a-zA-Z0-9_\\-]+")
+
+  @Test
+  def testSwallow() {
+    CoreUtils.swallow(logger.info, throw new KafkaException("test"))
+  }
+
+  @Test
+  def testCircularIterator() {
+    val l = List(1, 2)
+    val itl = CoreUtils.circularIterator(l)
+    assertEquals(1, itl.next())
+    assertEquals(2, itl.next())
+    assertEquals(1, itl.next())
+    assertEquals(2, itl.next())
+    assertFalse(itl.hasDefiniteSize)
+
+    val s = Set(1, 2)
+    val its = CoreUtils.circularIterator(s)
+    assertEquals(1, its.next())
+    assertEquals(2, its.next())
+    assertEquals(1, its.next())
+    assertEquals(2, its.next())
+    assertEquals(1, its.next())
+  }
+
+  @Test
+  def testReadBytes() {
+    for(testCase <- List("", "a", "abcd")) {
+      val bytes = testCase.getBytes
+      assertTrue(Arrays.equals(bytes, Utils.readBytes(ByteBuffer.wrap(bytes))))
+    }
+  }
+
+  @Test
+  def testAbs() {
+    assertEquals(0, Utils.abs(Integer.MIN_VALUE))
+    assertEquals(1, Utils.abs(-1))
+    assertEquals(0, Utils.abs(0))
+    assertEquals(1, Utils.abs(1))
+    assertEquals(Integer.MAX_VALUE, Utils.abs(Integer.MAX_VALUE))
+  }
+
+  @Test
+  def testReplaceSuffix() {
+    assertEquals("blah.foo.text", CoreUtils.replaceSuffix("blah.foo.txt", ".txt", ".text"))
+    assertEquals("blah.foo", CoreUtils.replaceSuffix("blah.foo.txt", ".txt", ""))
+    assertEquals("txt.txt", CoreUtils.replaceSuffix("txt.txt.txt", ".txt", ""))
+    assertEquals("foo.txt", CoreUtils.replaceSuffix("foo", "", ".txt"))
+  }
+
+  @Test
+  def testReadInt() {
+    val values = Array(0, 1, -1, Byte.MaxValue, Short.MaxValue, 2 * Short.MaxValue, Int.MaxValue/2, Int.MinValue/2, Int.MaxValue, Int.MinValue, Int.MaxValue)
+    val buffer = ByteBuffer.allocate(4 * values.size)
+    for(i <- 0 until values.length) {
+      buffer.putInt(i*4, values(i))
+      assertEquals("Written value should match read value.", values(i), CoreUtils.readInt(buffer.array, i*4))
+    }
+  }
+
+  @Test
+  def testCsvList() {
+    val emptyString:String = ""
+    val nullString:String = null
+    val emptyList = CoreUtils.parseCsvList(emptyString)
+    val emptyListFromNullString = CoreUtils.parseCsvList(nullString)
+    val emptyStringList = Seq.empty[String]
+    assertTrue(emptyList!=null)
+    assertTrue(emptyListFromNullString!=null)
+    assertTrue(emptyStringList.equals(emptyListFromNullString))
+    assertTrue(emptyStringList.equals(emptyList))
+  }
+
+  @Test
+  def testCsvMap() {
+    val emptyString: String = ""
+    val emptyMap = CoreUtils.parseCsvMap(emptyString)
+    val emptyStringMap = Map.empty[String, String]
+    assertTrue(emptyMap != null)
+    assertTrue(emptyStringMap.equals(emptyStringMap))
+
+    val kvPairsIpV6: String = "a:b:c:v,a:b:c:v"
+    val ipv6Map = CoreUtils.parseCsvMap(kvPairsIpV6)
+    for (m <- ipv6Map) {
+      assertTrue(m._1.equals("a:b:c"))
+      assertTrue(m._2.equals("v"))
+    }
+
+    val singleEntry:String = "key:value"
+    val singleMap = CoreUtils.parseCsvMap(singleEntry)
+    val value = singleMap.getOrElse("key", 0)
+    assertTrue(value.equals("value"))
+
+    val kvPairsIpV4: String = "192.168.2.1/30:allow, 192.168.2.1/30:allow"
+    val ipv4Map = CoreUtils.parseCsvMap(kvPairsIpV4)
+    for (m <- ipv4Map) {
+      assertTrue(m._1.equals("192.168.2.1/30"))
+      assertTrue(m._2.equals("allow"))
+    }
+
+    val kvPairsSpaces: String = "key:value      , key:   value"
+    val spaceMap = CoreUtils.parseCsvMap(kvPairsSpaces)
+    for (m <- spaceMap) {
+      assertTrue(m._1.equals("key"))
+      assertTrue(m._2.equals("value"))
+    }
+  }
+
+  @Test
+  def testInLock() {
+    val lock = new ReentrantLock()
+    val result = inLock(lock) {
+      assertTrue("Should be in lock", lock.isHeldByCurrentThread)
+      1 + 1
+    }
+    assertEquals(2, result)
+    assertFalse("Should be unlocked", lock.isLocked)
+  }
+
+  @Test
+  def testUrlSafeBase64EncodeUUID() {
+
+    // Test a UUID that has no + or / characters in base64 encoding [a149b4a3-06e1-4b49-a8cb-8a9c4a59fa46 ->(base64)-> oUm0owbhS0moy4qcSln6Rg==]
+    val clusterId1 = Base64.urlEncoderNoPadding.encodeToString(CoreUtils.getBytesFromUuid(UUID.fromString(
+      "a149b4a3-06e1-4b49-a8cb-8a9c4a59fa46")))
+    assertEquals(clusterId1, "oUm0owbhS0moy4qcSln6Rg")
+    assertEquals(clusterId1.length, 22)
+    assertTrue(clusterIdPattern.matcher(clusterId1).matches())
+
+    // Test a UUID that has + or / characters in base64 encoding [d418ec02-277e-4853-81e6-afe30259daec ->(base64)-> 1BjsAid+SFOB5q/jAlna7A==]
+    val clusterId2 = Base64.urlEncoderNoPadding.encodeToString(CoreUtils.getBytesFromUuid(UUID.fromString(
+      "d418ec02-277e-4853-81e6-afe30259daec")))
+    assertEquals(clusterId2, "1BjsAid-SFOB5q_jAlna7A")
+    assertEquals(clusterId2.length, 22)
+    assertTrue(clusterIdPattern.matcher(clusterId2).matches())
+  }
+
+  @Test
+  def testGenerateUuidAsBase64() {
+    val clusterId = CoreUtils.generateUuidAsBase64()
+    assertEquals(clusterId.length, 22)
+    assertTrue(clusterIdPattern.matcher(clusterId).matches())
+  }
+
+  @Test
+  def testAtomicGetOrUpdate(): Unit = {
+    val count = 1000
+    val nThreads = 5
+    val createdCount = new AtomicInteger
+    val map = new ConcurrentHashMap[Int, AtomicInteger]().asScala
+    implicit val executionContext = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(nThreads))
+    try {
+      Await.result(Future.traverse(1 to count) { i =>
+        Future {
+          CoreUtils.atomicGetOrUpdate(map, 0, {
+            createdCount.incrementAndGet
+            new AtomicInteger
+          }).incrementAndGet()
+        }
+      }, Duration(1, TimeUnit.MINUTES))
+      assertEquals(count, map(0).get)
+      val created = createdCount.get
+      assertTrue(s"Too many creations $created", created > 0 && created <= nThreads)
+    } finally {
+      executionContext.shutdownNow()
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/b9039bbd/core/src/test/scala/unit/kafka/utils/UtilsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/UtilsTest.scala b/core/src/test/scala/unit/kafka/utils/UtilsTest.scala
deleted file mode 100755
index 7b43118..0000000
--- a/core/src/test/scala/unit/kafka/utils/UtilsTest.scala
+++ /dev/null
@@ -1,213 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.utils
-
-import java.util.{Arrays, UUID}
-import java.util.concurrent.{ ConcurrentHashMap, Executors }
-import java.util.concurrent.atomic.AtomicInteger
-import java.util.concurrent.locks.ReentrantLock
-import java.nio.ByteBuffer
-import java.util.regex.Pattern
-
-import org.apache.log4j.Logger
-import org.scalatest.junit.JUnitSuite
-import org.junit.Assert._
-import kafka.common.KafkaException
-import kafka.utils.CoreUtils.inLock
-import org.junit.Test
-import org.apache.kafka.common.utils.{Base64, Utils}
-
-import scala.collection.JavaConverters._
-
-class UtilsTest extends JUnitSuite {
-
-  private val logger = Logger.getLogger(classOf[UtilsTest])
-  val clusterIdPattern = Pattern.compile("[a-zA-Z0-9_\\-]+")
-
-  @Test
-  def testSwallow() {
-    CoreUtils.swallow(logger.info, throw new KafkaException("test"))
-  }
-
-  @Test
-  def testCircularIterator() {
-    val l = List(1, 2)
-    val itl = CoreUtils.circularIterator(l)
-    assertEquals(1, itl.next())
-    assertEquals(2, itl.next())
-    assertEquals(1, itl.next())
-    assertEquals(2, itl.next())
-    assertFalse(itl.hasDefiniteSize)
-
-    val s = Set(1, 2)
-    val its = CoreUtils.circularIterator(s)
-    assertEquals(1, its.next())
-    assertEquals(2, its.next())
-    assertEquals(1, its.next())
-    assertEquals(2, its.next())
-    assertEquals(1, its.next())
-  }
-
-  @Test
-  def testReadBytes() {
-    for(testCase <- List("", "a", "abcd")) {
-      val bytes = testCase.getBytes
-      assertTrue(Arrays.equals(bytes, Utils.readBytes(ByteBuffer.wrap(bytes))))
-    }
-  }
-
-  @Test
-  def testAbs() {
-    assertEquals(0, Utils.abs(Integer.MIN_VALUE))
-    assertEquals(1, Utils.abs(-1))
-    assertEquals(0, Utils.abs(0))
-    assertEquals(1, Utils.abs(1))
-    assertEquals(Integer.MAX_VALUE, Utils.abs(Integer.MAX_VALUE))
-  }
-
-  @Test
-  def testReplaceSuffix() {
-    assertEquals("blah.foo.text", CoreUtils.replaceSuffix("blah.foo.txt", ".txt", ".text"))
-    assertEquals("blah.foo", CoreUtils.replaceSuffix("blah.foo.txt", ".txt", ""))
-    assertEquals("txt.txt", CoreUtils.replaceSuffix("txt.txt.txt", ".txt", ""))
-    assertEquals("foo.txt", CoreUtils.replaceSuffix("foo", "", ".txt"))
-  }
-
-  @Test
-  def testReadInt() {
-    val values = Array(0, 1, -1, Byte.MaxValue, Short.MaxValue, 2 * Short.MaxValue, Int.MaxValue/2, Int.MinValue/2, Int.MaxValue, Int.MinValue, Int.MaxValue)
-    val buffer = ByteBuffer.allocate(4 * values.size)
-    for(i <- 0 until values.length) {
-      buffer.putInt(i*4, values(i))
-      assertEquals("Written value should match read value.", values(i), CoreUtils.readInt(buffer.array, i*4))
-    }
-  }
-
-  @Test
-  def testCsvList() {
-    val emptyString:String = ""
-    val nullString:String = null
-    val emptyList = CoreUtils.parseCsvList(emptyString)
-    val emptyListFromNullString = CoreUtils.parseCsvList(nullString)
-    val emptyStringList = Seq.empty[String]
-    assertTrue(emptyList!=null)
-    assertTrue(emptyListFromNullString!=null)
-    assertTrue(emptyStringList.equals(emptyListFromNullString))
-    assertTrue(emptyStringList.equals(emptyList))
-  }
-
-  @Test
-  def testCsvMap() {
-    val emptyString: String = ""
-    val emptyMap = CoreUtils.parseCsvMap(emptyString)
-    val emptyStringMap = Map.empty[String, String]
-    assertTrue(emptyMap != null)
-    assertTrue(emptyStringMap.equals(emptyStringMap))
-
-    val kvPairsIpV6: String = "a:b:c:v,a:b:c:v"
-    val ipv6Map = CoreUtils.parseCsvMap(kvPairsIpV6)
-    for (m <- ipv6Map) {
-      assertTrue(m._1.equals("a:b:c"))
-      assertTrue(m._2.equals("v"))
-    }
-
-    val singleEntry:String = "key:value"
-    val singleMap = CoreUtils.parseCsvMap(singleEntry)
-    val value = singleMap.getOrElse("key", 0)
-    assertTrue(value.equals("value"))
-
-    val kvPairsIpV4: String = "192.168.2.1/30:allow, 192.168.2.1/30:allow"
-    val ipv4Map = CoreUtils.parseCsvMap(kvPairsIpV4)
-    for (m <- ipv4Map) {
-      assertTrue(m._1.equals("192.168.2.1/30"))
-      assertTrue(m._2.equals("allow"))
-    }
-
-    val kvPairsSpaces: String = "key:value      , key:   value"
-    val spaceMap = CoreUtils.parseCsvMap(kvPairsSpaces)
-    for (m <- spaceMap) {
-      assertTrue(m._1.equals("key"))
-      assertTrue(m._2.equals("value"))
-    }
-  }
-
-  @Test
-  def testInLock() {
-    val lock = new ReentrantLock()
-    val result = inLock(lock) {
-      assertTrue("Should be in lock", lock.isHeldByCurrentThread)
-      1 + 1
-    }
-    assertEquals(2, result)
-    assertFalse("Should be unlocked", lock.isLocked)
-  }
-
-  @Test
-  def testUrlSafeBase64EncodeUUID() {
-
-    // Test a UUID that has no + or / characters in base64 encoding [a149b4a3-06e1-4b49-a8cb-8a9c4a59fa46 ->(base64)-> oUm0owbhS0moy4qcSln6Rg==]
-    val clusterId1 = Base64.urlEncoderNoPadding.encodeToString(CoreUtils.getBytesFromUuid(UUID.fromString(
-      "a149b4a3-06e1-4b49-a8cb-8a9c4a59fa46")))
-    assertEquals(clusterId1, "oUm0owbhS0moy4qcSln6Rg")
-    assertEquals(clusterId1.length, 22)
-    assertTrue(clusterIdPattern.matcher(clusterId1).matches())
-
-    // Test a UUID that has + or / characters in base64 encoding [d418ec02-277e-4853-81e6-afe30259daec ->(base64)-> 1BjsAid+SFOB5q/jAlna7A==]
-    val clusterId2 = Base64.urlEncoderNoPadding.encodeToString(CoreUtils.getBytesFromUuid(UUID.fromString(
-      "d418ec02-277e-4853-81e6-afe30259daec")))
-    assertEquals(clusterId2, "1BjsAid-SFOB5q_jAlna7A")
-    assertEquals(clusterId2.length, 22)
-    assertTrue(clusterIdPattern.matcher(clusterId2).matches())
-  }
-
-  @Test
-  def testGenerateUuidAsBase64() {
-    val clusterId = CoreUtils.generateUuidAsBase64()
-    assertEquals(clusterId.length, 22)
-    assertTrue(clusterIdPattern.matcher(clusterId).matches())
-  }
-
-  @Test
-  def testGetOrElseUpdateAtomically(): Unit = {
-    val count = 1000
-    val nThreads = 5
-    val createdCount = new AtomicInteger
-    val map = new ConcurrentHashMap[Int, AtomicInteger]().asScala
-    val executor = Executors.newFixedThreadPool(nThreads)
-    try {
-      for (i <- 1 to count) {
-        executor.submit(new Runnable() {
-          def run() {
-            CoreUtils.atomicGetOrUpdate(map, 0, {
-              createdCount.incrementAndGet
-              new AtomicInteger
-            }).incrementAndGet()
-          }
-        })
-      }
-      executor.shutdown()
-      executor.awaitTermination(1000, java.util.concurrent.TimeUnit.MILLISECONDS)
-
-      assertEquals(count, map(0).get)
-      val created = createdCount.get
-      assertTrue(s"Too many creations $created", created > 0 && created <= nThreads)
-    } finally {
-      executor.shutdownNow()
-    }
-  }
-}