You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/11/10 09:44:47 UTC
kafka git commit: MINOR: Use Scala Future in CoreUtils test
Repository: kafka
Updated Branches:
refs/heads/trunk 14e3ed048 -> b9039bbd4
MINOR: Use Scala Future in CoreUtils test
Also rename UtilsTest to CoreUtilsTest and note
that `getOrElseUpdate` has the right behaviour
in Scala 2.12.
Author: Ismael Juma <is...@juma.me.uk>
Reviewers: Rajini Sivaram <ra...@googlemail.com>
Closes #4142 from ijuma/use-scala-futures-in-core-utils-test
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b9039bbd
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b9039bbd
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b9039bbd
Branch: refs/heads/trunk
Commit: b9039bbd4b3d8eb231c9f2d1d40042706f8f4f19
Parents: 14e3ed0
Author: Ismael Juma <is...@juma.me.uk>
Authored: Fri Nov 10 09:44:43 2017 +0000
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Fri Nov 10 09:44:43 2017 +0000
----------------------------------------------------------------------
core/src/main/scala/kafka/utils/CoreUtils.scala | 4 +
.../scala/unit/kafka/utils/CoreUtilsTest.scala | 210 ++++++++++++++++++
.../test/scala/unit/kafka/utils/UtilsTest.scala | 213 -------------------
3 files changed, 214 insertions(+), 213 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/b9039bbd/core/src/main/scala/kafka/utils/CoreUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/CoreUtils.scala b/core/src/main/scala/kafka/utils/CoreUtils.scala
index b776406..7a853d5 100755
--- a/core/src/main/scala/kafka/utils/CoreUtils.scala
+++ b/core/src/main/scala/kafka/utils/CoreUtils.scala
@@ -309,6 +309,10 @@ object CoreUtils extends Logging {
* keys often exist in the map, avoiding the need to create a new value. `createValue`
* may be invoked more than once if multiple threads attempt to insert a key at the same
* time, but the same inserted value will be returned to all threads.
+ *
+ * In Scala 2.12, `ConcurrentMap.getOrElse` has the same behaviour as this method, but that
+ * is not the case in Scala 2.11. We can remove this method once we drop support for Scala
+ * 2.11.
*/
def atomicGetOrUpdate[K, V](map: concurrent.Map[K, V], key: K, createValue: => V): V = {
map.get(key) match {
http://git-wip-us.apache.org/repos/asf/kafka/blob/b9039bbd/core/src/test/scala/unit/kafka/utils/CoreUtilsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/CoreUtilsTest.scala b/core/src/test/scala/unit/kafka/utils/CoreUtilsTest.scala
new file mode 100755
index 0000000..5e607be
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/utils/CoreUtilsTest.scala
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.utils
+
+import java.util.{Arrays, UUID}
+import java.util.concurrent.{ConcurrentHashMap, Executors, TimeUnit}
+import java.util.concurrent.atomic.AtomicInteger
+import java.util.concurrent.locks.ReentrantLock
+import java.nio.ByteBuffer
+import java.util.regex.Pattern
+
+import org.apache.log4j.Logger
+import org.scalatest.junit.JUnitSuite
+import org.junit.Assert._
+import kafka.common.KafkaException
+import kafka.utils.CoreUtils.inLock
+import org.junit.Test
+import org.apache.kafka.common.utils.{Base64, Utils}
+
+import scala.collection.JavaConverters._
+import scala.concurrent.duration.Duration
+import scala.concurrent.{Await, ExecutionContext, Future}
+
+class CoreUtilsTest extends JUnitSuite {
+
+ private val logger = Logger.getLogger(classOf[CoreUtilsTest])
+ val clusterIdPattern = Pattern.compile("[a-zA-Z0-9_\\-]+")
+
+ @Test
+ def testSwallow() {
+ CoreUtils.swallow(logger.info, throw new KafkaException("test"))
+ }
+
+ @Test
+ def testCircularIterator() {
+ val l = List(1, 2)
+ val itl = CoreUtils.circularIterator(l)
+ assertEquals(1, itl.next())
+ assertEquals(2, itl.next())
+ assertEquals(1, itl.next())
+ assertEquals(2, itl.next())
+ assertFalse(itl.hasDefiniteSize)
+
+ val s = Set(1, 2)
+ val its = CoreUtils.circularIterator(s)
+ assertEquals(1, its.next())
+ assertEquals(2, its.next())
+ assertEquals(1, its.next())
+ assertEquals(2, its.next())
+ assertEquals(1, its.next())
+ }
+
+ @Test
+ def testReadBytes() {
+ for(testCase <- List("", "a", "abcd")) {
+ val bytes = testCase.getBytes
+ assertTrue(Arrays.equals(bytes, Utils.readBytes(ByteBuffer.wrap(bytes))))
+ }
+ }
+
+ @Test
+ def testAbs() {
+ assertEquals(0, Utils.abs(Integer.MIN_VALUE))
+ assertEquals(1, Utils.abs(-1))
+ assertEquals(0, Utils.abs(0))
+ assertEquals(1, Utils.abs(1))
+ assertEquals(Integer.MAX_VALUE, Utils.abs(Integer.MAX_VALUE))
+ }
+
+ @Test
+ def testReplaceSuffix() {
+ assertEquals("blah.foo.text", CoreUtils.replaceSuffix("blah.foo.txt", ".txt", ".text"))
+ assertEquals("blah.foo", CoreUtils.replaceSuffix("blah.foo.txt", ".txt", ""))
+ assertEquals("txt.txt", CoreUtils.replaceSuffix("txt.txt.txt", ".txt", ""))
+ assertEquals("foo.txt", CoreUtils.replaceSuffix("foo", "", ".txt"))
+ }
+
+ @Test
+ def testReadInt() {
+ val values = Array(0, 1, -1, Byte.MaxValue, Short.MaxValue, 2 * Short.MaxValue, Int.MaxValue/2, Int.MinValue/2, Int.MaxValue, Int.MinValue, Int.MaxValue)
+ val buffer = ByteBuffer.allocate(4 * values.size)
+ for(i <- 0 until values.length) {
+ buffer.putInt(i*4, values(i))
+ assertEquals("Written value should match read value.", values(i), CoreUtils.readInt(buffer.array, i*4))
+ }
+ }
+
+ @Test
+ def testCsvList() {
+ val emptyString:String = ""
+ val nullString:String = null
+ val emptyList = CoreUtils.parseCsvList(emptyString)
+ val emptyListFromNullString = CoreUtils.parseCsvList(nullString)
+ val emptyStringList = Seq.empty[String]
+ assertTrue(emptyList!=null)
+ assertTrue(emptyListFromNullString!=null)
+ assertTrue(emptyStringList.equals(emptyListFromNullString))
+ assertTrue(emptyStringList.equals(emptyList))
+ }
+
+ @Test
+ def testCsvMap() {
+ val emptyString: String = ""
+ val emptyMap = CoreUtils.parseCsvMap(emptyString)
+ val emptyStringMap = Map.empty[String, String]
+ assertTrue(emptyMap != null)
+ assertTrue(emptyStringMap.equals(emptyStringMap))
+
+ val kvPairsIpV6: String = "a:b:c:v,a:b:c:v"
+ val ipv6Map = CoreUtils.parseCsvMap(kvPairsIpV6)
+ for (m <- ipv6Map) {
+ assertTrue(m._1.equals("a:b:c"))
+ assertTrue(m._2.equals("v"))
+ }
+
+ val singleEntry:String = "key:value"
+ val singleMap = CoreUtils.parseCsvMap(singleEntry)
+ val value = singleMap.getOrElse("key", 0)
+ assertTrue(value.equals("value"))
+
+ val kvPairsIpV4: String = "192.168.2.1/30:allow, 192.168.2.1/30:allow"
+ val ipv4Map = CoreUtils.parseCsvMap(kvPairsIpV4)
+ for (m <- ipv4Map) {
+ assertTrue(m._1.equals("192.168.2.1/30"))
+ assertTrue(m._2.equals("allow"))
+ }
+
+ val kvPairsSpaces: String = "key:value , key: value"
+ val spaceMap = CoreUtils.parseCsvMap(kvPairsSpaces)
+ for (m <- spaceMap) {
+ assertTrue(m._1.equals("key"))
+ assertTrue(m._2.equals("value"))
+ }
+ }
+
+ @Test
+ def testInLock() {
+ val lock = new ReentrantLock()
+ val result = inLock(lock) {
+ assertTrue("Should be in lock", lock.isHeldByCurrentThread)
+ 1 + 1
+ }
+ assertEquals(2, result)
+ assertFalse("Should be unlocked", lock.isLocked)
+ }
+
+ @Test
+ def testUrlSafeBase64EncodeUUID() {
+
+ // Test a UUID that has no + or / characters in base64 encoding [a149b4a3-06e1-4b49-a8cb-8a9c4a59fa46 ->(base64)-> oUm0owbhS0moy4qcSln6Rg==]
+ val clusterId1 = Base64.urlEncoderNoPadding.encodeToString(CoreUtils.getBytesFromUuid(UUID.fromString(
+ "a149b4a3-06e1-4b49-a8cb-8a9c4a59fa46")))
+ assertEquals(clusterId1, "oUm0owbhS0moy4qcSln6Rg")
+ assertEquals(clusterId1.length, 22)
+ assertTrue(clusterIdPattern.matcher(clusterId1).matches())
+
+ // Test a UUID that has + or / characters in base64 encoding [d418ec02-277e-4853-81e6-afe30259daec ->(base64)-> 1BjsAid+SFOB5q/jAlna7A==]
+ val clusterId2 = Base64.urlEncoderNoPadding.encodeToString(CoreUtils.getBytesFromUuid(UUID.fromString(
+ "d418ec02-277e-4853-81e6-afe30259daec")))
+ assertEquals(clusterId2, "1BjsAid-SFOB5q_jAlna7A")
+ assertEquals(clusterId2.length, 22)
+ assertTrue(clusterIdPattern.matcher(clusterId2).matches())
+ }
+
+ @Test
+ def testGenerateUuidAsBase64() {
+ val clusterId = CoreUtils.generateUuidAsBase64()
+ assertEquals(clusterId.length, 22)
+ assertTrue(clusterIdPattern.matcher(clusterId).matches())
+ }
+
+ @Test
+ def testAtomicGetOrUpdate(): Unit = {
+ val count = 1000
+ val nThreads = 5
+ val createdCount = new AtomicInteger
+ val map = new ConcurrentHashMap[Int, AtomicInteger]().asScala
+ implicit val executionContext = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(nThreads))
+ try {
+ Await.result(Future.traverse(1 to count) { i =>
+ Future {
+ CoreUtils.atomicGetOrUpdate(map, 0, {
+ createdCount.incrementAndGet
+ new AtomicInteger
+ }).incrementAndGet()
+ }
+ }, Duration(1, TimeUnit.MINUTES))
+ assertEquals(count, map(0).get)
+ val created = createdCount.get
+ assertTrue(s"Too many creations $created", created > 0 && created <= nThreads)
+ } finally {
+ executionContext.shutdownNow()
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/b9039bbd/core/src/test/scala/unit/kafka/utils/UtilsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/UtilsTest.scala b/core/src/test/scala/unit/kafka/utils/UtilsTest.scala
deleted file mode 100755
index 7b43118..0000000
--- a/core/src/test/scala/unit/kafka/utils/UtilsTest.scala
+++ /dev/null
@@ -1,213 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.utils
-
-import java.util.{Arrays, UUID}
-import java.util.concurrent.{ ConcurrentHashMap, Executors }
-import java.util.concurrent.atomic.AtomicInteger
-import java.util.concurrent.locks.ReentrantLock
-import java.nio.ByteBuffer
-import java.util.regex.Pattern
-
-import org.apache.log4j.Logger
-import org.scalatest.junit.JUnitSuite
-import org.junit.Assert._
-import kafka.common.KafkaException
-import kafka.utils.CoreUtils.inLock
-import org.junit.Test
-import org.apache.kafka.common.utils.{Base64, Utils}
-
-import scala.collection.JavaConverters._
-
-class UtilsTest extends JUnitSuite {
-
- private val logger = Logger.getLogger(classOf[UtilsTest])
- val clusterIdPattern = Pattern.compile("[a-zA-Z0-9_\\-]+")
-
- @Test
- def testSwallow() {
- CoreUtils.swallow(logger.info, throw new KafkaException("test"))
- }
-
- @Test
- def testCircularIterator() {
- val l = List(1, 2)
- val itl = CoreUtils.circularIterator(l)
- assertEquals(1, itl.next())
- assertEquals(2, itl.next())
- assertEquals(1, itl.next())
- assertEquals(2, itl.next())
- assertFalse(itl.hasDefiniteSize)
-
- val s = Set(1, 2)
- val its = CoreUtils.circularIterator(s)
- assertEquals(1, its.next())
- assertEquals(2, its.next())
- assertEquals(1, its.next())
- assertEquals(2, its.next())
- assertEquals(1, its.next())
- }
-
- @Test
- def testReadBytes() {
- for(testCase <- List("", "a", "abcd")) {
- val bytes = testCase.getBytes
- assertTrue(Arrays.equals(bytes, Utils.readBytes(ByteBuffer.wrap(bytes))))
- }
- }
-
- @Test
- def testAbs() {
- assertEquals(0, Utils.abs(Integer.MIN_VALUE))
- assertEquals(1, Utils.abs(-1))
- assertEquals(0, Utils.abs(0))
- assertEquals(1, Utils.abs(1))
- assertEquals(Integer.MAX_VALUE, Utils.abs(Integer.MAX_VALUE))
- }
-
- @Test
- def testReplaceSuffix() {
- assertEquals("blah.foo.text", CoreUtils.replaceSuffix("blah.foo.txt", ".txt", ".text"))
- assertEquals("blah.foo", CoreUtils.replaceSuffix("blah.foo.txt", ".txt", ""))
- assertEquals("txt.txt", CoreUtils.replaceSuffix("txt.txt.txt", ".txt", ""))
- assertEquals("foo.txt", CoreUtils.replaceSuffix("foo", "", ".txt"))
- }
-
- @Test
- def testReadInt() {
- val values = Array(0, 1, -1, Byte.MaxValue, Short.MaxValue, 2 * Short.MaxValue, Int.MaxValue/2, Int.MinValue/2, Int.MaxValue, Int.MinValue, Int.MaxValue)
- val buffer = ByteBuffer.allocate(4 * values.size)
- for(i <- 0 until values.length) {
- buffer.putInt(i*4, values(i))
- assertEquals("Written value should match read value.", values(i), CoreUtils.readInt(buffer.array, i*4))
- }
- }
-
- @Test
- def testCsvList() {
- val emptyString:String = ""
- val nullString:String = null
- val emptyList = CoreUtils.parseCsvList(emptyString)
- val emptyListFromNullString = CoreUtils.parseCsvList(nullString)
- val emptyStringList = Seq.empty[String]
- assertTrue(emptyList!=null)
- assertTrue(emptyListFromNullString!=null)
- assertTrue(emptyStringList.equals(emptyListFromNullString))
- assertTrue(emptyStringList.equals(emptyList))
- }
-
- @Test
- def testCsvMap() {
- val emptyString: String = ""
- val emptyMap = CoreUtils.parseCsvMap(emptyString)
- val emptyStringMap = Map.empty[String, String]
- assertTrue(emptyMap != null)
- assertTrue(emptyStringMap.equals(emptyStringMap))
-
- val kvPairsIpV6: String = "a:b:c:v,a:b:c:v"
- val ipv6Map = CoreUtils.parseCsvMap(kvPairsIpV6)
- for (m <- ipv6Map) {
- assertTrue(m._1.equals("a:b:c"))
- assertTrue(m._2.equals("v"))
- }
-
- val singleEntry:String = "key:value"
- val singleMap = CoreUtils.parseCsvMap(singleEntry)
- val value = singleMap.getOrElse("key", 0)
- assertTrue(value.equals("value"))
-
- val kvPairsIpV4: String = "192.168.2.1/30:allow, 192.168.2.1/30:allow"
- val ipv4Map = CoreUtils.parseCsvMap(kvPairsIpV4)
- for (m <- ipv4Map) {
- assertTrue(m._1.equals("192.168.2.1/30"))
- assertTrue(m._2.equals("allow"))
- }
-
- val kvPairsSpaces: String = "key:value , key: value"
- val spaceMap = CoreUtils.parseCsvMap(kvPairsSpaces)
- for (m <- spaceMap) {
- assertTrue(m._1.equals("key"))
- assertTrue(m._2.equals("value"))
- }
- }
-
- @Test
- def testInLock() {
- val lock = new ReentrantLock()
- val result = inLock(lock) {
- assertTrue("Should be in lock", lock.isHeldByCurrentThread)
- 1 + 1
- }
- assertEquals(2, result)
- assertFalse("Should be unlocked", lock.isLocked)
- }
-
- @Test
- def testUrlSafeBase64EncodeUUID() {
-
- // Test a UUID that has no + or / characters in base64 encoding [a149b4a3-06e1-4b49-a8cb-8a9c4a59fa46 ->(base64)-> oUm0owbhS0moy4qcSln6Rg==]
- val clusterId1 = Base64.urlEncoderNoPadding.encodeToString(CoreUtils.getBytesFromUuid(UUID.fromString(
- "a149b4a3-06e1-4b49-a8cb-8a9c4a59fa46")))
- assertEquals(clusterId1, "oUm0owbhS0moy4qcSln6Rg")
- assertEquals(clusterId1.length, 22)
- assertTrue(clusterIdPattern.matcher(clusterId1).matches())
-
- // Test a UUID that has + or / characters in base64 encoding [d418ec02-277e-4853-81e6-afe30259daec ->(base64)-> 1BjsAid+SFOB5q/jAlna7A==]
- val clusterId2 = Base64.urlEncoderNoPadding.encodeToString(CoreUtils.getBytesFromUuid(UUID.fromString(
- "d418ec02-277e-4853-81e6-afe30259daec")))
- assertEquals(clusterId2, "1BjsAid-SFOB5q_jAlna7A")
- assertEquals(clusterId2.length, 22)
- assertTrue(clusterIdPattern.matcher(clusterId2).matches())
- }
-
- @Test
- def testGenerateUuidAsBase64() {
- val clusterId = CoreUtils.generateUuidAsBase64()
- assertEquals(clusterId.length, 22)
- assertTrue(clusterIdPattern.matcher(clusterId).matches())
- }
-
- @Test
- def testGetOrElseUpdateAtomically(): Unit = {
- val count = 1000
- val nThreads = 5
- val createdCount = new AtomicInteger
- val map = new ConcurrentHashMap[Int, AtomicInteger]().asScala
- val executor = Executors.newFixedThreadPool(nThreads)
- try {
- for (i <- 1 to count) {
- executor.submit(new Runnable() {
- def run() {
- CoreUtils.atomicGetOrUpdate(map, 0, {
- createdCount.incrementAndGet
- new AtomicInteger
- }).incrementAndGet()
- }
- })
- }
- executor.shutdown()
- executor.awaitTermination(1000, java.util.concurrent.TimeUnit.MILLISECONDS)
-
- assertEquals(count, map(0).get)
- val created = createdCount.get
- assertTrue(s"Too many creations $created", created > 0 && created <= nThreads)
- } finally {
- executor.shutdownNow()
- }
- }
-}