You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2014/03/10 17:31:26 UTC
git commit: SAMZA-16; fix java 7 test incompatibility
Repository: incubator-samza
Updated Branches:
refs/heads/master 38e828832 -> 048ffd2fe
SAMZA-16; fix java 7 test incompatibility
Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/048ffd2f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/048ffd2f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/048ffd2f
Branch: refs/heads/master
Commit: 048ffd2fee118b5d15ca9fd962477ee8a1dc67c8
Parents: 38e8288
Author: Martin Kleppmann <ma...@rapportive.com>
Authored: Mon Mar 10 09:31:19 2014 -0700
Committer: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Committed: Mon Mar 10 09:31:19 2014 -0700
----------------------------------------------------------------------
.../samza/system/kafka/TopicMetadataCache.scala | 4 ++
.../system/kafka/TestTopicMetadataCache.scala | 68 +++++++++++---------
2 files changed, 40 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/048ffd2f/samza-kafka/src/main/scala/org/apache/samza/system/kafka/TopicMetadataCache.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/TopicMetadataCache.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/TopicMetadataCache.scala
index 8a24ce3..8a8834f 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/TopicMetadataCache.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/TopicMetadataCache.scala
@@ -65,4 +65,8 @@ object TopicMetadataCache extends Logging {
}.toMap
}
}
+
+ def clear {
+ topicMetadataMap.clear
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/048ffd2f/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestTopicMetadataCache.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestTopicMetadataCache.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestTopicMetadataCache.scala
index 3c9c8d6..9ddcb71 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestTopicMetadataCache.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestTopicMetadataCache.scala
@@ -18,6 +18,9 @@
*/
package org.apache.samza.system.kafka
+
+import org.junit.Assert._
+import org.junit.Before
import org.junit.Test
import kafka.api.TopicMetadata
import org.apache.samza.util.TopicMetadataStore
@@ -28,7 +31,7 @@ import org.apache.samza.util.Clock
class TestTopicMetadataCache {
- object MockTime extends Clock {
+ class MockTime extends Clock {
var currentValue = 0
def currentTimeMillis: Long = currentValue
@@ -46,66 +49,67 @@ class TestTopicMetadataCache {
numberOfCalls.getAndIncrement
topicMetadata
}
- /*
- def onTopicMissingMock(topic: String, zkClient: ZkClient) {
- mockCache += topic -> new TopicMetadata(topic, List.empty, 0)
- }
- */
def setErrorCode(topic: String, errorCode: Short) {
mockCache += topic -> new TopicMetadata(topic, List.empty, errorCode)
}
- def resetNumOfCalls = numberOfCalls = new AtomicInteger(0)
}
- val mockStore = new MockTopicMetadataStore()
- val waitForThreadStart = new CountDownLatch(3)
+ @Before def setup {
+ TopicMetadataCache.clear
+ }
@Test
def testBasicMetadataCacheFunctionality {
+ val mockStore = new MockTopicMetadataStore
+ val mockTime = new MockTime
+
// Retrieve a topic from the cache. Initially cache is empty and store is queried to get the data
mockStore.setErrorCode("topic1", 3)
- var metadata = TopicMetadataCache.getTopicMetadata(Set("topic1"), "kafka", mockStore.getTopicInfo, 5, () => MockTime.currentTimeMillis)
- assert(metadata("topic1").topic.equals("topic1"))
- assert(metadata("topic1").errorCode == 3)
- assert(mockStore.numberOfCalls.get() == 1)
+ var metadata = TopicMetadataCache.getTopicMetadata(Set("topic1"), "kafka", mockStore.getTopicInfo, 5, mockTime.currentTimeMillis)
+ assertEquals("topic1", metadata("topic1").topic)
+ assertEquals(3, metadata("topic1").errorCode)
+ assertEquals(1, mockStore.numberOfCalls.get())
// Retrieve the same topic from the cache which has an error code. Ensure the store is called to refresh the cache
- MockTime.currentValue = 5
+ mockTime.currentValue = 5
mockStore.setErrorCode("topic1", 0)
- metadata = TopicMetadataCache.getTopicMetadata(Set("topic1"), "kafka", mockStore.getTopicInfo, 5, () => MockTime.currentTimeMillis)
- assert(metadata("topic1").topic.equals("topic1"))
- assert(metadata("topic1").errorCode == 0)
- assert(mockStore.numberOfCalls.get() == 2)
+ metadata = TopicMetadataCache.getTopicMetadata(Set("topic1"), "kafka", mockStore.getTopicInfo, 5, mockTime.currentTimeMillis)
+ assertEquals("topic1", metadata("topic1").topic)
+ assertEquals(0, metadata("topic1").errorCode)
+ assertEquals(2, mockStore.numberOfCalls.get())
// Retrieve the same topic from the cache with refresh rate greater than the last update. Ensure the store is not
// called
- metadata = TopicMetadataCache.getTopicMetadata(Set("topic1"), "kafka", mockStore.getTopicInfo, 5, () => MockTime.currentTimeMillis)
- assert(metadata("topic1").topic.equals("topic1"))
- assert(metadata("topic1").errorCode == 0)
- assert(mockStore.numberOfCalls.get() == 2)
+ metadata = TopicMetadataCache.getTopicMetadata(Set("topic1"), "kafka", mockStore.getTopicInfo, 5, mockTime.currentTimeMillis)
+ assertEquals("topic1", metadata("topic1").topic)
+ assertEquals(0, metadata("topic1").errorCode)
+ assertEquals(2, mockStore.numberOfCalls.get())
// Ensure that refresh happens when refresh rate is less than the last update. Ensure the store is called
- MockTime.currentValue = 11
- metadata = TopicMetadataCache.getTopicMetadata(Set("topic1"), "kafka", mockStore.getTopicInfo, 5, () => MockTime.currentTimeMillis)
- assert(metadata("topic1").topic.equals("topic1"))
- assert(metadata("topic1").errorCode == 0)
- assert(mockStore.numberOfCalls.get() == 3)
+ mockTime.currentValue = 11
+ metadata = TopicMetadataCache.getTopicMetadata(Set("topic1"), "kafka", mockStore.getTopicInfo, 5, mockTime.currentTimeMillis)
+ assertEquals("topic1", metadata("topic1").topic)
+ assertEquals(0, metadata("topic1").errorCode)
+ assertEquals(3, mockStore.numberOfCalls.get())
}
@Test
def testMultiThreadedInteractionForTopicMetadataCache {
- mockStore.resetNumOfCalls
- MockTime.currentValue = 17
+ val mockStore = new MockTopicMetadataStore
+ val mockTime = new MockTime
+ val waitForThreadStart = new CountDownLatch(3)
val numAssertionSuccess = new AtomicBoolean(true)
// Add topic to the cache from multiple threads and ensure the store is called only once
val threads = new Array[Thread](3)
+
+ mockTime.currentValue = 17
for (i <- 0 until 3) {
threads(i) = new Thread(new Runnable {
def run {
waitForThreadStart.countDown()
waitForThreadStart.await()
- val metadata = TopicMetadataCache.getTopicMetadata(Set("topic1"), "kafka", mockStore.getTopicInfo, 5, () => MockTime.currentTimeMillis)
+ val metadata = TopicMetadataCache.getTopicMetadata(Set("topic1"), "kafka", mockStore.getTopicInfo, 5, mockTime.currentTimeMillis)
numAssertionSuccess.compareAndSet(true, metadata("topic1").topic.equals("topic1"))
numAssertionSuccess.compareAndSet(true, metadata("topic1").errorCode == 0)
}
@@ -115,7 +119,7 @@ class TestTopicMetadataCache {
for (i <- 0 until 3) {
threads(i).join
}
- assert(numAssertionSuccess.get() == true)
- assert(mockStore.numberOfCalls.get() == 1)
+ assertTrue(numAssertionSuccess.get())
+ assertEquals(1, mockStore.numberOfCalls.get())
}
}