You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2014/03/10 17:31:26 UTC

git commit: SAMZA-16; fix java 7 test incompatibility

Repository: incubator-samza
Updated Branches:
  refs/heads/master 38e828832 -> 048ffd2fe


SAMZA-16; fix java 7 test incompatibility


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/048ffd2f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/048ffd2f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/048ffd2f

Branch: refs/heads/master
Commit: 048ffd2fee118b5d15ca9fd962477ee8a1dc67c8
Parents: 38e8288
Author: Martin Kleppmann <ma...@rapportive.com>
Authored: Mon Mar 10 09:31:19 2014 -0700
Committer: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Committed: Mon Mar 10 09:31:19 2014 -0700

----------------------------------------------------------------------
 .../samza/system/kafka/TopicMetadataCache.scala |  4 ++
 .../system/kafka/TestTopicMetadataCache.scala   | 68 +++++++++++---------
 2 files changed, 40 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/048ffd2f/samza-kafka/src/main/scala/org/apache/samza/system/kafka/TopicMetadataCache.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/TopicMetadataCache.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/TopicMetadataCache.scala
index 8a24ce3..8a8834f 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/TopicMetadataCache.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/TopicMetadataCache.scala
@@ -65,4 +65,8 @@ object TopicMetadataCache extends Logging {
         }.toMap
     }
   }
+
+  def clear {
+    topicMetadataMap.clear
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/048ffd2f/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestTopicMetadataCache.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestTopicMetadataCache.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestTopicMetadataCache.scala
index 3c9c8d6..9ddcb71 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestTopicMetadataCache.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestTopicMetadataCache.scala
@@ -18,6 +18,9 @@
  */
 
 package org.apache.samza.system.kafka
+
+import org.junit.Assert._
+import org.junit.Before
 import org.junit.Test
 import kafka.api.TopicMetadata
 import org.apache.samza.util.TopicMetadataStore
@@ -28,7 +31,7 @@ import org.apache.samza.util.Clock
 
 class TestTopicMetadataCache {
 
-  object MockTime extends Clock {
+  class MockTime extends Clock {
     var currentValue = 0
 
     def currentTimeMillis: Long = currentValue
@@ -46,66 +49,67 @@ class TestTopicMetadataCache {
       numberOfCalls.getAndIncrement
       topicMetadata
     }
-    /*
-    def onTopicMissingMock(topic: String, zkClient: ZkClient) {
-      mockCache += topic -> new TopicMetadata(topic, List.empty, 0)
-    }
-    */
 
     def setErrorCode(topic: String, errorCode: Short) {
       mockCache += topic -> new TopicMetadata(topic, List.empty, errorCode)
     }
-    def resetNumOfCalls = numberOfCalls = new AtomicInteger(0)
   }
 
-  val mockStore = new MockTopicMetadataStore()
-  val waitForThreadStart = new CountDownLatch(3)
+  @Before def setup {
+    TopicMetadataCache.clear
+  }
 
   @Test
   def testBasicMetadataCacheFunctionality {
+    val mockStore = new MockTopicMetadataStore
+    val mockTime = new MockTime
+
     // Retrieve a topic from the cache. Initially cache is empty and store is queried to get the data
     mockStore.setErrorCode("topic1", 3)
-    var metadata = TopicMetadataCache.getTopicMetadata(Set("topic1"), "kafka", mockStore.getTopicInfo, 5, () => MockTime.currentTimeMillis)
-    assert(metadata("topic1").topic.equals("topic1"))
-    assert(metadata("topic1").errorCode == 3)
-    assert(mockStore.numberOfCalls.get() == 1)
+    var metadata = TopicMetadataCache.getTopicMetadata(Set("topic1"), "kafka", mockStore.getTopicInfo, 5, mockTime.currentTimeMillis)
+    assertEquals("topic1", metadata("topic1").topic)
+    assertEquals(3, metadata("topic1").errorCode)
+    assertEquals(1, mockStore.numberOfCalls.get())
 
     // Retrieve the same topic from the cache which has an error code. Ensure the store is called to refresh the cache
-    MockTime.currentValue = 5
+    mockTime.currentValue = 5
     mockStore.setErrorCode("topic1", 0)
-    metadata = TopicMetadataCache.getTopicMetadata(Set("topic1"), "kafka", mockStore.getTopicInfo, 5, () => MockTime.currentTimeMillis)
-    assert(metadata("topic1").topic.equals("topic1"))
-    assert(metadata("topic1").errorCode == 0)
-    assert(mockStore.numberOfCalls.get() == 2)
+    metadata = TopicMetadataCache.getTopicMetadata(Set("topic1"), "kafka", mockStore.getTopicInfo, 5, mockTime.currentTimeMillis)
+    assertEquals("topic1", metadata("topic1").topic)
+    assertEquals(0, metadata("topic1").errorCode)
+    assertEquals(2, mockStore.numberOfCalls.get())
 
     // Retrieve the same topic from the cache with refresh rate greater than the last update. Ensure the store is not
     // called
-    metadata = TopicMetadataCache.getTopicMetadata(Set("topic1"), "kafka", mockStore.getTopicInfo, 5, () => MockTime.currentTimeMillis)
-    assert(metadata("topic1").topic.equals("topic1"))
-    assert(metadata("topic1").errorCode == 0)
-    assert(mockStore.numberOfCalls.get() == 2)
+    metadata = TopicMetadataCache.getTopicMetadata(Set("topic1"), "kafka", mockStore.getTopicInfo, 5, mockTime.currentTimeMillis)
+    assertEquals("topic1", metadata("topic1").topic)
+    assertEquals(0, metadata("topic1").errorCode)
+    assertEquals(2, mockStore.numberOfCalls.get())
 
     // Ensure that refresh happens when refresh rate is less than the last update. Ensure the store is called
-    MockTime.currentValue = 11
-    metadata = TopicMetadataCache.getTopicMetadata(Set("topic1"), "kafka", mockStore.getTopicInfo, 5, () => MockTime.currentTimeMillis)
-    assert(metadata("topic1").topic.equals("topic1"))
-    assert(metadata("topic1").errorCode == 0)
-    assert(mockStore.numberOfCalls.get() == 3)
+    mockTime.currentValue = 11
+    metadata = TopicMetadataCache.getTopicMetadata(Set("topic1"), "kafka", mockStore.getTopicInfo, 5, mockTime.currentTimeMillis)
+    assertEquals("topic1", metadata("topic1").topic)
+    assertEquals(0, metadata("topic1").errorCode)
+    assertEquals(3, mockStore.numberOfCalls.get())
   }
 
   @Test
   def testMultiThreadedInteractionForTopicMetadataCache {
-    mockStore.resetNumOfCalls
-    MockTime.currentValue = 17
+    val mockStore = new MockTopicMetadataStore
+    val mockTime = new MockTime
+    val waitForThreadStart = new CountDownLatch(3)
     val numAssertionSuccess = new AtomicBoolean(true)
     // Add topic to the cache from multiple threads and ensure the store is called only once
     val threads = new Array[Thread](3)
+
+    mockTime.currentValue = 17
     for (i <- 0 until 3) {
       threads(i) = new Thread(new Runnable {
         def run {
           waitForThreadStart.countDown()
           waitForThreadStart.await()
-          val metadata = TopicMetadataCache.getTopicMetadata(Set("topic1"), "kafka", mockStore.getTopicInfo, 5, () => MockTime.currentTimeMillis)
+          val metadata = TopicMetadataCache.getTopicMetadata(Set("topic1"), "kafka", mockStore.getTopicInfo, 5, mockTime.currentTimeMillis)
           numAssertionSuccess.compareAndSet(true, metadata("topic1").topic.equals("topic1"))
           numAssertionSuccess.compareAndSet(true, metadata("topic1").errorCode == 0)
         }
@@ -115,7 +119,7 @@ class TestTopicMetadataCache {
     for (i <- 0 until 3) {
       threads(i).join
     }
-    assert(numAssertionSuccess.get() == true)
-    assert(mockStore.numberOfCalls.get() == 1)
+    assertTrue(numAssertionSuccess.get())
+    assertEquals(1, mockStore.numberOfCalls.get())
   }
 }