You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by bo...@apache.org on 2018/10/24 01:11:52 UTC
samza git commit: Tests for deprecated Kafka Consumer
Repository: samza
Updated Branches:
refs/heads/master 18b940134 -> 282f83494
Tests for deprecated Kafka Consumer
Tested with running with system set to org.apache.samza.system.kafka_deprecated.KafkaSystemFactory.
Author: Boris S <bs...@linkedin.com>
Author: Boris S <bo...@apache.org>
Author: Boris Shkolnik <bs...@linkedin.com>
Reviewers: Cameron Lee <ca...@linkedin.com>
Closes #755 from sborya/OldKafkaConsumer
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/282f8349
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/282f8349
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/282f8349
Branch: refs/heads/master
Commit: 282f834943bf3504d17406d93ecd8d92a3b96ced
Parents: 18b9401
Author: Boris S <bs...@linkedin.com>
Authored: Tue Oct 23 18:11:43 2018 -0700
Committer: Boris S <bs...@linkedin.com>
Committed: Tue Oct 23 18:11:43 2018 -0700
----------------------------------------------------------------------
.../kafka_deprecated/TestBrokerProxy.scala | 434 +++++++++++++
.../system/kafka_deprecated/TestGetOffset.scala | 110 ++++
.../kafka_deprecated/TestKafkaSystemAdmin.scala | 351 +++++++++++
.../TestKafkaSystemConsumer.scala | 191 ++++++
.../TestKafkaSystemFactory.scala | 98 +++
.../TestKafkaSystemProducer.scala | 604 +++++++++++++++++++
.../TestTopicMetadataCache.scala | 139 +++++
7 files changed, 1927 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/282f8349/samza-kafka/src/test/scala/org/apache/samza/system/kafka_deprecated/TestBrokerProxy.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka_deprecated/TestBrokerProxy.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka_deprecated/TestBrokerProxy.scala
new file mode 100644
index 0000000..702e674
--- /dev/null
+++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka_deprecated/TestBrokerProxy.scala
@@ -0,0 +1,434 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.samza.system.kafka_deprecated
+
+import java.nio.ByteBuffer
+import java.util.concurrent.CountDownLatch
+
+import kafka.api.{PartitionOffsetsResponse, _}
+import kafka.common.TopicAndPartition
+import kafka.consumer.SimpleConsumer
+import kafka.message.{ByteBufferMessageSet, Message, MessageAndOffset, MessageSet}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.samza.SamzaException
+import org.apache.samza.util.Logging
+import org.junit.Assert._
+import org.junit._
+import org.mockito.Matchers._
+import org.mockito.Mockito._
+import org.mockito.invocation.InvocationOnMock
+import org.mockito.stubbing.Answer
+import org.mockito.{Matchers, Mockito}
+
+import scala.collection.JavaConverters._
+
+class TestBrokerProxy extends Logging {
+ val tp2 = new TopicAndPartition("Redbird", 2013)
+ var fetchTp1 = true // control whether fetching tp1 messages or not
+
+ @Test def brokerProxyRetrievesMessagesCorrectly() = {
+ val (bp, tp, sink) = getMockBrokerProxy()
+
+ bp.start
+ bp.addTopicPartition(tp, Option("0"))
+ // Add tp2, which should never receive messages since sink disables it.
+ bp.addTopicPartition(tp2, Option("0"))
+ Thread.sleep(1000)
+ assertEquals(2, sink.receivedMessages.size)
+ assertEquals(42, sink.receivedMessages(0)._2.offset)
+ assertEquals(84, sink.receivedMessages(1)._2.offset)
+ }
+
+ @Test def brokerProxySkipsFetchForEmptyRequests() = {
+ val (bp, tp, sink) = getMockBrokerProxy()
+
+ bp.start
+ // Only add tp2, which should never receive messages since sink disables it.
+ bp.addTopicPartition(tp2, Option("0"))
+ Thread.sleep(1000)
+ assertEquals(0, sink.receivedMessages.size)
+ assertTrue(bp.metrics.brokerSkippedFetchRequests.get((bp.host, bp.port)).getCount > 0)
+ assertEquals(0, bp.metrics.brokerReads.get((bp.host, bp.port)).getCount)
+ }
+
+ @Test def brokerProxyThrowsExceptionOnDuplicateTopicPartitions() = {
+ val (bp, tp, _) = getMockBrokerProxy()
+ bp.start
+ bp.addTopicPartition(tp, Option("0"))
+
+ try {
+ bp.addTopicPartition(tp, Option("1"))
+ fail("Should have thrown an exception")
+ } catch {
+ case se: SamzaException => assertEquals(se.getMessage, "Already consuming TopicPartition [Redbird,2012]")
+ case other: Exception => fail("Got some other exception than what we were expecting: " + other)
+ }
+ }
+
+ def getMockBrokerProxy() = {
+ val sink = new MessageSink {
+ val receivedMessages = new scala.collection.mutable.ListBuffer[(TopicAndPartition, MessageAndOffset, Boolean)]()
+
+ def abdicate(tp: TopicAndPartition, nextOffset: Long) {}
+
+ def refreshDropped() {}
+
+ def addMessage(tp: TopicAndPartition, msg: MessageAndOffset, highWatermark: Long) {
+ receivedMessages += ((tp, msg, msg.offset.equals(highWatermark)))
+ }
+
+ def setIsAtHighWatermark(tp: TopicAndPartition, isAtHighWatermark: Boolean) {
+ }
+
+ // Never need messages for tp2.
+ def needsMoreMessages(tp: TopicAndPartition): Boolean = !tp.equals(tp2) && fetchTp1
+ }
+
+ val system = "daSystem"
+ val host = "host"
+ val port = 2222
+ val tp = new TopicAndPartition("Redbird", 2012)
+ val metrics = new KafkaSystemConsumerMetrics(system)
+
+ metrics.registerBrokerProxy(host, port)
+ metrics.registerTopicAndPartition(tp)
+ metrics.topicPartitions.get((host, port)).set(1)
+
+ val bp = new BrokerProxy(
+ host,
+ port,
+ system,
+ "daClientId",
+ metrics,
+ sink,
+ offsetGetter = new GetOffset("fail", Map("Redbird" -> "largest"))) {
+
+ override val sleepMSWhileNoTopicPartitions = 100
+ // Speed up for test
+ var alreadyCreatedConsumer = false
+
+ // Scala traits and Mockito mocks don't mix, unfortunately.
+ override def createSimpleConsumer() = {
+ if (alreadyCreatedConsumer) {
+ System.err.println("Should only be creating one consumer in this test!")
+ throw new InterruptedException("Should only be creating one consumer in this test!")
+ }
+ alreadyCreatedConsumer = true
+
+ new DefaultFetchSimpleConsumer("a", 1, 2, 3, "b", new StreamFetchSizes(42)) {
+ val sc = Mockito.mock(classOf[SimpleConsumer])
+ val mockOffsetResponse = {
+ val offsetResponse = Mockito.mock(classOf[OffsetResponse])
+ val partitionOffsetResponse = {
+ val por = Mockito.mock(classOf[PartitionOffsetsResponse])
+ when(por.offsets).thenReturn(List(1l).toSeq)
+ por
+ }
+
+ val map = scala.Predef.Map[TopicAndPartition, PartitionOffsetsResponse](tp -> partitionOffsetResponse, tp2 -> partitionOffsetResponse)
+ when(offsetResponse.partitionErrorAndOffsets).thenReturn(map)
+ offsetResponse
+ }
+
+ when(sc.getOffsetsBefore(any(classOf[OffsetRequest]))).thenReturn(mockOffsetResponse)
+
+ val fetchResponse = {
+ val fetchResponse = Mockito.mock(classOf[FetchResponse])
+
+ val messageSet = {
+ val messageSet = Mockito.mock(classOf[ByteBufferMessageSet])
+
+ def getMessage() = new Message(Mockito.mock(classOf[ByteBuffer]))
+ val messages = List(new MessageAndOffset(getMessage, 42), new MessageAndOffset(getMessage, 84))
+
+ when(messageSet.sizeInBytes).thenReturn(43)
+ when(messageSet.size).thenReturn(44)
+ when(messageSet.iterator).thenReturn(messages.iterator)
+ when(messageSet.head).thenReturn(messages.head)
+ messageSet
+ }
+
+ val fetchResponsePartitionData = FetchResponsePartitionData(Errors.NONE, 500, messageSet)
+ val map = scala.Predef.Map[TopicAndPartition, FetchResponsePartitionData](tp -> fetchResponsePartitionData)
+
+ when(fetchResponse.data).thenReturn(map.toSeq)
+ when(fetchResponse.messageSet(any(classOf[String]), any(classOf[Int]))).thenReturn(messageSet)
+ fetchResponse
+ }
+ when(sc.fetch(any(classOf[FetchRequest]))).thenReturn(fetchResponse)
+
+ override def close() = sc.close()
+
+ override def send(request: TopicMetadataRequest): TopicMetadataResponse = sc.send(request)
+
+ override def fetch(request: FetchRequest): FetchResponse = {
+ // Verify that we only get fetch requests for one tp, even though
+ // two were registered. This is to verify that
+ // sink.needsMoreMessages works.
+ assertEquals(1, request.requestInfo.size)
+ sc.fetch(request)
+ }
+
+ when(sc.earliestOrLatestOffset(any(classOf[TopicAndPartition]), any(classOf[Long]), any(classOf[Int]))).thenReturn(100)
+
+ override def getOffsetsBefore(request: OffsetRequest): OffsetResponse = sc.getOffsetsBefore(request)
+
+ override def commitOffsets(request: OffsetCommitRequest): OffsetCommitResponse = sc.commitOffsets(request)
+
+ override def fetchOffsets(request: OffsetFetchRequest): OffsetFetchResponse = sc.fetchOffsets(request)
+
+ override def earliestOrLatestOffset(topicAndPartition: TopicAndPartition, earliestOrLatest: Long, consumerId: Int): Long = sc.earliestOrLatestOffset(topicAndPartition, earliestOrLatest, consumerId)
+ }
+ }
+
+ }
+
+ (bp, tp, sink)
+ }
+
+ @Test def brokerProxyUpdateLatencyMetrics() = {
+ val (bp, tp, _) = getMockBrokerProxy()
+
+ bp.start
+ bp.addTopicPartition(tp, Option("0"))
+ Thread.sleep(1000)
+ // update when fetching messages
+ assertEquals(500, bp.metrics.highWatermark.get(tp).getValue)
+ assertEquals(415, bp.metrics.lag.get(tp).getValue)
+
+ fetchTp1 = false
+ Thread.sleep(1000)
+ // update when not fetching messages
+ assertEquals(100, bp.metrics.highWatermark.get(tp).getValue)
+ assertEquals(15, bp.metrics.lag.get(tp).getValue)
+
+ fetchTp1 = true
+ }
+
+ @Test def brokerProxyCorrectlyHandlesOffsetOutOfRange(): Unit = {
+ // Need to wait for the thread to do some work before ending the test
+ val countdownLatch = new CountDownLatch(1)
+ var failString: String = null
+
+ val mockMessageSink = mock(classOf[MessageSink])
+ when(mockMessageSink.needsMoreMessages(any())).thenReturn(true)
+
+ val doNothingMetrics = new KafkaSystemConsumerMetrics()
+
+ val tp = new TopicAndPartition("topic", 42)
+
+ val mockOffsetGetter = mock(classOf[GetOffset])
+ // This will be used by the simple consumer below, and this is the response that simple consumer needs
+ when(mockOffsetGetter.isValidOffset(any(classOf[DefaultFetchSimpleConsumer]), Matchers.eq(tp), Matchers.eq("0"))).thenReturn(true)
+ when(mockOffsetGetter.getResetOffset(any(classOf[DefaultFetchSimpleConsumer]), Matchers.eq(tp))).thenReturn(1492l)
+
+ var callsToCreateSimpleConsumer = 0
+ val mockSimpleConsumer = mock(classOf[DefaultFetchSimpleConsumer])
+
+ // Create an answer that first indicates offset out of range on first invocation and on second
+ // verifies that the parameters have been updated to what we expect them to be
+ val answer = new Answer[FetchResponse]() {
+ var invocationCount = 0
+
+ def answer(invocation: InvocationOnMock): FetchResponse = {
+ val arguments = invocation.getArguments()(0).asInstanceOf[List[Object]](0).asInstanceOf[(String, Long)]
+
+ if (invocationCount == 0) {
+ if (arguments !=(tp, 0)) {
+ failString = "First invocation did not have the right arguments: " + arguments
+ countdownLatch.countDown()
+ }
+ val mfr = mock(classOf[FetchResponse])
+ when(mfr.hasError).thenReturn(true)
+ when(mfr.error("topic", 42)).thenReturn(Errors.OFFSET_OUT_OF_RANGE)
+
+ val messageSet = mock(classOf[MessageSet])
+ when(messageSet.iterator).thenReturn(Iterator.empty)
+ val response = mock(classOf[FetchResponsePartitionData])
+ when(response.error).thenReturn(Errors.OFFSET_OUT_OF_RANGE)
+ val responseMap = Map(tp -> response)
+ when(mfr.data).thenReturn(responseMap.toSeq)
+ invocationCount += 1
+ mfr
+ } else {
+ if (arguments !=(tp, 1492)) {
+ failString = "On second invocation, arguments were not correct: " + arguments
+ }
+ countdownLatch.countDown()
+ Thread.currentThread().interrupt()
+ null
+ }
+ }
+ }
+
+ when(mockSimpleConsumer.defaultFetch(any())).thenAnswer(answer)
+
+ // So now we have a fetch response that will fail. Prime the mockGetOffset to send us to a new offset
+
+ val bp = new BrokerProxy("host", 423, "system", "clientID", doNothingMetrics, mockMessageSink, Int.MaxValue, 1024000, new StreamFetchSizes(256 * 1024), 524288, 1000, mockOffsetGetter) {
+
+ override def createSimpleConsumer() = {
+ if (callsToCreateSimpleConsumer > 1) {
+ failString = "Tried to create more than one simple consumer"
+ countdownLatch.countDown()
+ }
+ callsToCreateSimpleConsumer += 1
+ mockSimpleConsumer
+ }
+ }
+
+ bp.addTopicPartition(tp, Option("0"))
+ bp.start
+ countdownLatch.await()
+ bp.stop
+ if (failString != null) {
+ fail(failString)
+ }
+ }
+
+ /**
+ * Test that makes sure that BrokerProxy abdicates all TopicAndPartitions
+ * that it owns when a consumer failure occurs.
+ */
+ @Test def brokerProxyAbdicatesOnConnectionFailure(): Unit = {
+ val countdownLatch = new CountDownLatch(1)
+ var abdicated: Option[TopicAndPartition] = None
+ @volatile var refreshDroppedCount = 0
+ val mockMessageSink = new MessageSink {
+ override def setIsAtHighWatermark(tp: TopicAndPartition, isAtHighWatermark: Boolean) {
+ }
+
+ override def addMessage(tp: TopicAndPartition, msg: MessageAndOffset, highWatermark: Long) {
+ }
+
+ override def abdicate(tp: TopicAndPartition, nextOffset: Long) {
+ abdicated = Some(tp)
+ countdownLatch.countDown
+ }
+
+ override def refreshDropped() {
+ refreshDroppedCount += 1
+ }
+
+ override def needsMoreMessages(tp: TopicAndPartition): Boolean = {
+ true
+ }
+ }
+
+ val doNothingMetrics = new KafkaSystemConsumerMetrics()
+ val tp = new TopicAndPartition("topic", 42)
+ val mockOffsetGetter = mock(classOf[GetOffset])
+ val mockSimpleConsumer = mock(classOf[DefaultFetchSimpleConsumer])
+
+ when(mockOffsetGetter.isValidOffset(any(classOf[DefaultFetchSimpleConsumer]), Matchers.eq(tp), Matchers.eq("0"))).thenReturn(true)
+ when(mockOffsetGetter.getResetOffset(any(classOf[DefaultFetchSimpleConsumer]), Matchers.eq(tp))).thenReturn(1492l)
+ when(mockSimpleConsumer.defaultFetch(any())).thenThrow(new SamzaException("Pretend this is a ClosedChannelException. Can't use ClosedChannelException because it's checked, and Mockito doesn't like that."))
+
+ val bp = new BrokerProxy("host", 567, "system", "clientID", doNothingMetrics, mockMessageSink, Int.MaxValue, 1024000, new StreamFetchSizes(256 * 1024), 524288, 1000, mockOffsetGetter) {
+ override def createSimpleConsumer() = {
+ mockSimpleConsumer
+ }
+ }
+
+ val waitForRefresh = () => {
+ val currentRefreshDroppedCount = refreshDroppedCount
+ while (refreshDroppedCount == currentRefreshDroppedCount) {
+ Thread.sleep(100)
+ }
+ }
+
+ bp.addTopicPartition(tp, Option("0"))
+ bp.start
+ // BP should refresh on startup.
+ waitForRefresh()
+ countdownLatch.await()
+ // BP should continue refreshing after it's abdicated all TopicAndPartitions.
+ waitForRefresh()
+ bp.stop
+ assertEquals(tp, abdicated.getOrElse(null))
+ }
+
+ @Test def brokerProxyAbdicatesHardErrors(): Unit = {
+ val doNothingMetrics = new KafkaSystemConsumerMetrics
+ val mockMessageSink = new MessageSink {
+ override def needsMoreMessages(tp: TopicAndPartition): Boolean = true
+ override def abdicate(tp: TopicAndPartition, nextOffset: Long) {}
+ override def addMessage(tp: TopicAndPartition, msg: MessageAndOffset, highWatermark: Long) {}
+ override def refreshDropped() {throw new OutOfMemoryError("Test - OOME")}
+ override def setIsAtHighWatermark(tp: TopicAndPartition, isAtHighWatermark: Boolean): Unit = {}
+ }
+ val mockOffsetGetter = mock(classOf[GetOffset])
+ val mockSimpleConsumer = mock(classOf[DefaultFetchSimpleConsumer])
+
+ val bp = new BrokerProxy("host", 658, "system", "clientID", doNothingMetrics, mockMessageSink, Int.MaxValue, 1024000, new StreamFetchSizes(256 * 1024), 524288, 1000, mockOffsetGetter) {
+ override def createSimpleConsumer() = {
+ mockSimpleConsumer
+ }
+ }
+ var caughtError = false
+ try {
+ bp.thread.run
+ } catch {
+ case e: SamzaException => {
+ assertEquals(e.getMessage, "Got out of memory error in broker proxy thread.")
+ info("Received OutOfMemoryError in broker proxy.")
+ caughtError = true
+ }
+ }
+ assertEquals(true, caughtError)
+ val mockMessageSink2 = new MessageSink {
+ override def needsMoreMessages(tp: TopicAndPartition): Boolean = true
+ override def abdicate(tp: TopicAndPartition, nextOffset: Long): Unit = {}
+ override def addMessage(tp: TopicAndPartition, msg: MessageAndOffset, highWatermark: Long): Unit = {}
+ override def refreshDropped(): Unit = {throw new StackOverflowError("Test - SOE")}
+ override def setIsAtHighWatermark(tp: TopicAndPartition, isAtHighWatermark: Boolean): Unit = {}
+ }
+ caughtError = false
+ val bp2 = new BrokerProxy("host", 689, "system", "clientID2", doNothingMetrics, mockMessageSink2, Int.MaxValue, 1024000, new StreamFetchSizes(256 * 1024), 524288, 1000, mockOffsetGetter) {
+ override def createSimpleConsumer() = {
+ mockSimpleConsumer
+ }
+ }
+ try {
+ bp2.thread.run
+ } catch {
+ case e: SamzaException => {
+ assertEquals(e.getMessage, "Got stack overflow error in broker proxy thread.")
+ info("Received StackOverflowError in broker proxy.")
+ caughtError = true
+ }
+ }
+ assertEquals(true, caughtError)
+ }
+
+ @Test
+ def brokerProxyStopCloseConsumer: Unit = {
+ val mockSimpleConsumer = mock(classOf[DefaultFetchSimpleConsumer])
+ val bp = new BrokerProxy("host", 0, "system", "clientID", new KafkaSystemConsumerMetrics(), null){
+ override def createSimpleConsumer() = {
+ mockSimpleConsumer
+ }
+ }
+ bp.start
+ bp.stop
+ verify(mockSimpleConsumer).close
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/282f8349/samza-kafka/src/test/scala/org/apache/samza/system/kafka_deprecated/TestGetOffset.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka_deprecated/TestGetOffset.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka_deprecated/TestGetOffset.scala
new file mode 100644
index 0000000..21ee4cd
--- /dev/null
+++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka_deprecated/TestGetOffset.scala
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kafka_deprecated
+
+import java.nio.ByteBuffer
+
+import kafka.api._
+import kafka.common.TopicAndPartition
+import kafka.consumer.SimpleConsumer
+import kafka.message.Message
+import kafka.message.ByteBufferMessageSet
+import org.apache.kafka.common.errors.OffsetOutOfRangeException
+import org.junit._
+import org.junit.Assert._
+import org.mockito.Mockito
+import org.mockito.Mockito._
+import org.mockito.Matchers._
+import org.mockito.invocation.InvocationOnMock
+import org.mockito.stubbing.Answer
+
+class TestGetOffset {
+
+ private val outOfRangeOffset : String = "0"
+
+ /**
+ * An empty message set is still a valid offset. It just means that the
+ * offset was for the upcoming message, which hasn't yet been written. The
+ * fetch request times out in such a case, and an empty message set is
+ * returned.
+ */
+ @Test
+ def testIsValidOffsetWorksWithEmptyMessageSet {
+ val getOffset = new GetOffset(OffsetRequest.LargestTimeString)
+ // Should not throw an exception.
+ assertTrue(getOffset.isValidOffset(getMockDefaultFetchSimpleConsumer, TopicAndPartition("foo", 1), "1234"))
+ }
+
+ /**
+ * An empty message set is still a valid offset. It just means that the
+ * offset was for the upcoming message, which hasn't yet been written. The
+ * fetch request times out in such a case, and an empty message set is
+ * returned.
+ */
+ @Test
+ def testIsValidOffsetWorksWithOffsetOutOfRangeException {
+ val getOffset = new GetOffset(OffsetRequest.LargestTimeString)
+ // Should not throw an exception.
+ assertFalse(getOffset.isValidOffset(getMockDefaultFetchSimpleConsumer, TopicAndPartition("foo", 1), outOfRangeOffset))
+ }
+
+ /**
+ * Create a default fetch simple consumer that returns empty message sets.
+ */
+ def getMockDefaultFetchSimpleConsumer = {
+ new DefaultFetchSimpleConsumer("", 0, 0, 0, "") {
+ val sc = Mockito.mock(classOf[SimpleConsumer])
+
+ // Build an empty fetch response.
+ val fetchResponse = {
+ val fetchResponse = Mockito.mock(classOf[FetchResponse])
+ val messageSet = {
+ val messageSet = Mockito.mock(classOf[ByteBufferMessageSet])
+ val messages = List()
+
+ def getMessage() = new Message(Mockito.mock(classOf[ByteBuffer]))
+
+ when(messageSet.sizeInBytes).thenReturn(0)
+ when(messageSet.size).thenReturn(0)
+ when(messageSet.iterator).thenReturn(messages.iterator)
+
+ messageSet
+ }
+ when(fetchResponse.messageSet(any(classOf[String]), any(classOf[Int]))).thenReturn(messageSet)
+
+ fetchResponse
+ }
+
+ doAnswer(new Answer[FetchResponse] {
+ override def answer(invocation: InvocationOnMock): FetchResponse = {
+ if (invocation.getArgumentAt(0, classOf[FetchRequest]).requestInfo.exists(
+ req => req._2.offset.toString.equals(outOfRangeOffset))) {
+ throw new OffsetOutOfRangeException("test exception")
+ }
+ fetchResponse
+ }
+ }).when(sc).fetch(any(classOf[FetchRequest]))
+
+ override def fetch(request: FetchRequest): FetchResponse = {
+ sc.fetch(request)
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/282f8349/samza-kafka/src/test/scala/org/apache/samza/system/kafka_deprecated/TestKafkaSystemAdmin.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka_deprecated/TestKafkaSystemAdmin.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka_deprecated/TestKafkaSystemAdmin.scala
new file mode 100644
index 0000000..bf64c03
--- /dev/null
+++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka_deprecated/TestKafkaSystemAdmin.scala
@@ -0,0 +1,351 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.samza.system.kafka_deprecated
+
+import java.util.{Properties, UUID}
+
+import kafka.admin.AdminUtils
+import org.apache.kafka.common.errors.LeaderNotAvailableException
+import org.apache.kafka.common.protocol.Errors
+import kafka.consumer.{Consumer, ConsumerConfig, ConsumerConnector}
+import kafka.integration.KafkaServerTestHarness
+import kafka.server.KafkaConfig
+import kafka.utils.{TestUtils, ZkUtils}
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
+import org.apache.kafka.common.security.JaasUtils
+import org.apache.samza.Partition
+import org.apache.samza.config.KafkaProducerConfig
+import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
+import org.apache.samza.system.{StreamSpec, SystemStreamMetadata, SystemStreamPartition}
+import org.apache.samza.util.{ClientUtilTopicMetadataStore, ExponentialSleepStrategy, KafkaUtil, TopicMetadataStore}
+import org.junit.Assert._
+import org.junit._
+
+import scala.collection.JavaConverters._
+
+/**
+ * README: New tests should be added to the Java tests. See TestKafkaSystemAdminJava
+ */
+object TestKafkaSystemAdmin extends KafkaServerTestHarness {
+
+ val SYSTEM = "kafka"
+ val TOPIC = "input"
+ val TOPIC2 = "input2"
+ val TOTAL_PARTITIONS = 50
+ val REPLICATION_FACTOR = 2
+ val zkSecure = JaasUtils.isZkSecurityEnabled()
+
+ protected def numBrokers: Int = 3
+
+ var producer: KafkaProducer[Array[Byte], Array[Byte]] = null
+ var metadataStore: TopicMetadataStore = null
+ var producerConfig: KafkaProducerConfig = null
+ var systemAdmin: KafkaSystemAdmin = null
+
+ override def generateConfigs(): Seq[KafkaConfig] = {
+ val props = TestUtils.createBrokerConfigs(numBrokers, zkConnect, true)
+ props.map(KafkaConfig.fromProps)
+ }
+
+ @BeforeClass
+ override def setUp() {
+ super.setUp()
+ val config = new java.util.HashMap[String, String]()
+ config.put("bootstrap.servers", brokerList)
+ config.put("acks", "all")
+ config.put("serializer.class", "kafka.serializer.StringEncoder")
+ producerConfig = new KafkaProducerConfig("kafka", "i001", config)
+ producer = new KafkaProducer[Array[Byte], Array[Byte]](producerConfig.getProducerProperties)
+ metadataStore = new ClientUtilTopicMetadataStore(brokerList, "some-job-name")
+ systemAdmin = new KafkaSystemAdmin(SYSTEM, brokerList, connectZk = () => ZkUtils(zkConnect, 6000, 6000, zkSecure))
+ systemAdmin.start()
+ }
+
+ @AfterClass
+ override def tearDown() {
+ systemAdmin.stop()
+ producer.close()
+ super.tearDown()
+ }
+
+ def createTopic(topicName: String, partitionCount: Int) {
+ AdminUtils.createTopic(
+ zkUtils,
+ topicName,
+ partitionCount,
+ REPLICATION_FACTOR)
+ }
+
+ def validateTopic(topic: String, expectedPartitionCount: Int) {
+ var done = false
+ var retries = 0
+ val maxRetries = 100
+
+ while (!done && retries < maxRetries) {
+ try {
+ val topicMetadataMap = TopicMetadataCache.getTopicMetadata(Set(topic), SYSTEM, metadataStore.getTopicInfo)
+ val topicMetadata = topicMetadataMap(topic)
+
+ KafkaUtil.maybeThrowException(topicMetadata.error.exception())
+
+ done = expectedPartitionCount == topicMetadata.partitionsMetadata.size
+ } catch {
+ case e: Exception =>
+ System.err.println("Got exception while validating test topics. Waiting and retrying.", e)
+ retries += 1
+ Thread.sleep(500)
+ }
+ }
+
+ if (retries >= maxRetries) {
+ fail("Unable to successfully create topics. Tried to validate %s times." format retries)
+ }
+ }
+
+ def getConsumerConnector(): ConsumerConnector = {
+ val props = new Properties
+
+ props.put("zookeeper.connect", zkConnect)
+ props.put("group.id", "test")
+ props.put("auto.offset.reset", "smallest")
+
+ val consumerConfig = new ConsumerConfig(props)
+ Consumer.create(consumerConfig)
+ }
+
+ def createSystemAdmin(coordinatorStreamProperties: Properties, coordinatorStreamReplicationFactor: Int, topicMetaInformation: Map[String, ChangelogInfo]): KafkaSystemAdmin = {
+ new KafkaSystemAdmin(SYSTEM, brokerList, connectZk = () => ZkUtils(zkConnect, 6000, 6000, zkSecure), coordinatorStreamProperties,
+ coordinatorStreamReplicationFactor, 10000, ConsumerConfig.SocketBufferSize, UUID.randomUUID.toString, topicMetaInformation, Map(), false)
+ }
+
+}
+
+/**
+ * Test creates a local ZK and Kafka cluster, and uses it to create and test
+ * topics for to verify that offset APIs in SystemAdmin work as expected.
+ */
+class TestKafkaSystemAdmin {
+ import TestKafkaSystemAdmin._
+
+ @Test
+ def testShouldAssembleMetadata {
+ val oldestOffsets = Map(
+ new SystemStreamPartition(SYSTEM, "stream1", new Partition(0)) -> "o1",
+ new SystemStreamPartition(SYSTEM, "stream2", new Partition(0)) -> "o2",
+ new SystemStreamPartition(SYSTEM, "stream1", new Partition(1)) -> "o3",
+ new SystemStreamPartition(SYSTEM, "stream2", new Partition(1)) -> "o4")
+ val newestOffsets = Map(
+ new SystemStreamPartition(SYSTEM, "stream1", new Partition(0)) -> "n1",
+ new SystemStreamPartition(SYSTEM, "stream2", new Partition(0)) -> "n2",
+ new SystemStreamPartition(SYSTEM, "stream1", new Partition(1)) -> "n3",
+ new SystemStreamPartition(SYSTEM, "stream2", new Partition(1)) -> "n4")
+ val upcomingOffsets = Map(
+ new SystemStreamPartition(SYSTEM, "stream1", new Partition(0)) -> "u1",
+ new SystemStreamPartition(SYSTEM, "stream2", new Partition(0)) -> "u2",
+ new SystemStreamPartition(SYSTEM, "stream1", new Partition(1)) -> "u3",
+ new SystemStreamPartition(SYSTEM, "stream2", new Partition(1)) -> "u4")
+ val metadata = KafkaSystemAdmin.assembleMetadata(oldestOffsets, newestOffsets, upcomingOffsets)
+ assertNotNull(metadata)
+ assertEquals(2, metadata.size)
+ assertTrue(metadata.contains("stream1"))
+ assertTrue(metadata.contains("stream2"))
+ val stream1Metadata = metadata("stream1")
+ val stream2Metadata = metadata("stream2")
+ assertNotNull(stream1Metadata)
+ assertNotNull(stream2Metadata)
+ assertEquals("stream1", stream1Metadata.getStreamName)
+ assertEquals("stream2", stream2Metadata.getStreamName)
+ val expectedSystemStream1Partition0Metadata = new SystemStreamPartitionMetadata("o1", "n1", "u1")
+ val expectedSystemStream1Partition1Metadata = new SystemStreamPartitionMetadata("o3", "n3", "u3")
+ val expectedSystemStream2Partition0Metadata = new SystemStreamPartitionMetadata("o2", "n2", "u2")
+ val expectedSystemStream2Partition1Metadata = new SystemStreamPartitionMetadata("o4", "n4", "u4")
+ val stream1PartitionMetadata = stream1Metadata.getSystemStreamPartitionMetadata
+ val stream2PartitionMetadata = stream2Metadata.getSystemStreamPartitionMetadata
+ assertEquals(expectedSystemStream1Partition0Metadata, stream1PartitionMetadata.get(new Partition(0)))
+ assertEquals(expectedSystemStream1Partition1Metadata, stream1PartitionMetadata.get(new Partition(1)))
+ assertEquals(expectedSystemStream2Partition0Metadata, stream2PartitionMetadata.get(new Partition(0)))
+ assertEquals(expectedSystemStream2Partition1Metadata, stream2PartitionMetadata.get(new Partition(1)))
+ }
+
+ @Test
+ def testShouldGetOldestNewestAndNextOffsets {
+ // Create an empty topic with 50 partitions, but with no offsets.
+ createTopic(TOPIC, 50)
+ validateTopic(TOPIC, 50)
+
+ // Verify the empty topic behaves as expected.
+ var metadata = systemAdmin.getSystemStreamMetadata(Set(TOPIC).asJava)
+ assertEquals(1, metadata.size)
+ assertNotNull(metadata.get(TOPIC))
+ // Verify partition count.
+ var sspMetadata = metadata.get(TOPIC).getSystemStreamPartitionMetadata
+ assertEquals(50, sspMetadata.size)
+ // Empty topics should have null for latest offset and 0 for earliest offset
+ assertEquals("0", sspMetadata.get(new Partition(0)).getOldestOffset)
+ assertNull(sspMetadata.get(new Partition(0)).getNewestOffset)
+ // Empty Kafka topics should have a next offset of 0.
+ assertEquals("0", sspMetadata.get(new Partition(0)).getUpcomingOffset)
+
+ // Add a new message to one of the partitions, and verify that it works as
+ // expected.
+ producer.send(new ProducerRecord(TOPIC, 48, "key1".getBytes, "val1".getBytes)).get()
+ metadata = systemAdmin.getSystemStreamMetadata(Set(TOPIC).asJava)
+ assertEquals(1, metadata.size)
+ val streamName = metadata.keySet.asScala.head
+ assertEquals(TOPIC, streamName)
+ sspMetadata = metadata.get(streamName).getSystemStreamPartitionMetadata
+ // key1 gets hash-mod'd to partition 48.
+ assertEquals("0", sspMetadata.get(new Partition(48)).getOldestOffset)
+ assertEquals("0", sspMetadata.get(new Partition(48)).getNewestOffset)
+ assertEquals("1", sspMetadata.get(new Partition(48)).getUpcomingOffset)
+ // Some other partition should be empty.
+ assertEquals("0", sspMetadata.get(new Partition(3)).getOldestOffset)
+ assertNull(sspMetadata.get(new Partition(3)).getNewestOffset)
+ assertEquals("0", sspMetadata.get(new Partition(3)).getUpcomingOffset)
+
+ // Add a second message to one of the same partition.
+ producer.send(new ProducerRecord(TOPIC, 48, "key1".getBytes, "val2".getBytes)).get()
+ metadata = systemAdmin.getSystemStreamMetadata(Set(TOPIC).asJava)
+ assertEquals(1, metadata.size)
+ assertEquals(TOPIC, streamName)
+ sspMetadata = metadata.get(streamName).getSystemStreamPartitionMetadata
+ // key1 gets hash-mod'd to partition 48.
+ assertEquals("0", sspMetadata.get(new Partition(48)).getOldestOffset)
+ assertEquals("1", sspMetadata.get(new Partition(48)).getNewestOffset)
+ assertEquals("2", sspMetadata.get(new Partition(48)).getUpcomingOffset)
+
+ // Validate that a fetch will return the message.
+ val connector = getConsumerConnector
+ var stream = connector.createMessageStreams(Map(TOPIC -> 1))(TOPIC).head.iterator
+ var message = stream.next
+ var text = new String(message.message, "UTF-8")
+ connector.shutdown
+ // First message should match the earliest expected offset.
+ assertEquals(sspMetadata.get(new Partition(48)).getOldestOffset, message.offset.toString)
+ assertEquals("val1", text)
+ // Second message should match the earliest expected offset.
+ message = stream.next
+ text = new String(message.message, "UTF-8")
+ assertEquals(sspMetadata.get(new Partition(48)).getNewestOffset, message.offset.toString)
+ assertEquals("val2", text)
+ }
+
+ @Test
+ def testNonExistentTopic {
+ val initialOffsets = systemAdmin.getSystemStreamMetadata(Set("non-existent-topic").asJava)
+ val metadata = initialOffsets.asScala.getOrElse("non-existent-topic", fail("missing metadata"))
+ assertEquals(metadata, new SystemStreamMetadata("non-existent-topic", Map(
+ new Partition(0) -> new SystemStreamPartitionMetadata("0", null, "0")).asJava))
+ }
+
+ @Test
+ def testOffsetsAfter {
+ val ssp1 = new SystemStreamPartition("test-system", "test-stream", new Partition(0))
+ val ssp2 = new SystemStreamPartition("test-system", "test-stream", new Partition(1))
+ val offsetsAfter = systemAdmin.getOffsetsAfter(Map(
+ ssp1 -> "1",
+ ssp2 -> "2").asJava)
+ assertEquals("2", offsetsAfter.get(ssp1))
+ assertEquals("3", offsetsAfter.get(ssp2))
+ }
+
+ @Test
+ def testShouldCreateCoordinatorStream {
+ val topic = "test-coordinator-stream"
+ val systemAdmin = new KafkaSystemAdmin(SYSTEM, brokerList, () => ZkUtils(zkConnect, 6000, 6000, zkSecure), coordinatorStreamReplicationFactor = 3)
+
+ val spec = StreamSpec.createCoordinatorStreamSpec(topic, "kafka")
+ systemAdmin.createStream(spec)
+ validateTopic(topic, 1)
+ val topicMetadataMap = TopicMetadataCache.getTopicMetadata(Set(topic), "kafka", metadataStore.getTopicInfo)
+ assertTrue(topicMetadataMap.contains(topic))
+ val topicMetadata = topicMetadataMap(topic)
+ val partitionMetadata = topicMetadata.partitionsMetadata.head
+ assertEquals(0, partitionMetadata.partitionId)
+ assertEquals(3, partitionMetadata.replicas.size)
+ }
+
+ class KafkaSystemAdminWithTopicMetadataError extends KafkaSystemAdmin(SYSTEM, brokerList, () => ZkUtils(zkConnect, 6000, 6000, zkSecure)) {
+ import kafka.api.TopicMetadata
+ var metadataCallCount = 0
+
+ // Simulate Kafka telling us that the leader for the topic is not available
+ override def getTopicMetadata(topics: Set[String]) = {
+ metadataCallCount += 1
+ val topicMetadata = TopicMetadata(topic = "quux", partitionsMetadata = Seq(), error = Errors.LEADER_NOT_AVAILABLE)
+ Map("quux" -> topicMetadata)
+ }
+ }
+
+ @Test
+ def testShouldRetryOnTopicMetadataError {
+ val systemAdmin = new KafkaSystemAdminWithTopicMetadataError
+ val retryBackoff = new ExponentialSleepStrategy.Mock(maxCalls = 3)
+ try {
+ systemAdmin.getSystemStreamMetadata(Set("quux").asJava, retryBackoff)
+ fail("expected CallLimitReached to be thrown")
+ } catch {
+ case e: ExponentialSleepStrategy.CallLimitReached => ()
+ }
+ }
+
+ @Test
+ def testGetNewestOffset {
+ createTopic(TOPIC2, 16)
+ validateTopic(TOPIC2, 16)
+
+ val sspUnderTest = new SystemStreamPartition("kafka", TOPIC2, new Partition(4))
+ val otherSsp = new SystemStreamPartition("kafka", TOPIC2, new Partition(13))
+
+ assertNull(systemAdmin.getNewestOffset(sspUnderTest, 3))
+ assertNull(systemAdmin.getNewestOffset(otherSsp, 3))
+
+ // Add a new message to one of the partitions, and verify that it works as expected.
+ assertEquals("0", producer.send(new ProducerRecord(TOPIC2, 4, "key1".getBytes, "val1".getBytes)).get().offset().toString)
+ assertEquals("0", systemAdmin.getNewestOffset(sspUnderTest, 3))
+ assertNull(systemAdmin.getNewestOffset(otherSsp, 3))
+
+ // Again
+ assertEquals("1", producer.send(new ProducerRecord(TOPIC2, 4, "key2".getBytes, "val2".getBytes)).get().offset().toString)
+ assertEquals("1", systemAdmin.getNewestOffset(sspUnderTest, 3))
+ assertNull(systemAdmin.getNewestOffset(otherSsp, 3))
+
+ // Add a message to both partitions
+ assertEquals("2", producer.send(new ProducerRecord(TOPIC2, 4, "key3".getBytes, "val3".getBytes)).get().offset().toString)
+ assertEquals("0", producer.send(new ProducerRecord(TOPIC2, 13, "key4".getBytes, "val4".getBytes)).get().offset().toString)
+ assertEquals("2", systemAdmin.getNewestOffset(sspUnderTest, 0))
+ assertEquals("0", systemAdmin.getNewestOffset(otherSsp, 0))
+ }
+
+ @Test (expected = classOf[LeaderNotAvailableException])
+ def testGetNewestOffsetMaxRetry {
+ val expectedRetryCount = 3
+ val systemAdmin = new KafkaSystemAdminWithTopicMetadataError
+ try {
+ systemAdmin.getNewestOffset(new SystemStreamPartition(SYSTEM, "quux", new Partition(0)), 3)
+ } catch {
+ case e: Exception =>
+ assertEquals(expectedRetryCount + 1, systemAdmin.metadataCallCount)
+ throw e
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/282f8349/samza-kafka/src/test/scala/org/apache/samza/system/kafka_deprecated/TestKafkaSystemConsumer.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka_deprecated/TestKafkaSystemConsumer.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka_deprecated/TestKafkaSystemConsumer.scala
new file mode 100644
index 0000000..ac37619
--- /dev/null
+++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka_deprecated/TestKafkaSystemConsumer.scala
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kafka_deprecated
+
+import kafka.api.TopicMetadata
+import kafka.api.PartitionMetadata
+import kafka.cluster.Broker
+import kafka.common.TopicAndPartition
+import kafka.message.Message
+import kafka.message.MessageAndOffset
+import org.apache.kafka.common.protocol.Errors
+import org.apache.samza.system.IncomingMessageEnvelope
+import org.apache.samza.system.SystemStreamPartition
+import org.apache.samza.Partition
+import org.apache.samza.util.TopicMetadataStore
+import org.junit.Test
+import org.junit.Assert._
+import org.apache.samza.system.SystemAdmin
+import org.mockito.Mockito._
+import org.mockito.Matchers._
+
+class TestKafkaSystemConsumer {
+ val systemAdmin: SystemAdmin = mock(classOf[KafkaSystemAdmin])
+ private val SSP: SystemStreamPartition = new SystemStreamPartition("test", "test", new Partition(0))
+ private val envelope: IncomingMessageEnvelope = new IncomingMessageEnvelope(SSP, null, null, null)
+ private val envelopeWithSize: IncomingMessageEnvelope = new IncomingMessageEnvelope(SSP, null, null, null, 100)
+ private val clientId = "TestClientId"
+
+ @Test
+ def testFetchThresholdShouldDivideEvenlyAmongPartitions {
+ val metadataStore = new MockMetadataStore
+ val consumer = new KafkaSystemConsumer("", systemAdmin, new KafkaSystemConsumerMetrics, metadataStore, clientId, fetchThreshold = 50000) {
+ override def refreshBrokers {
+ }
+ }
+
+ for (i <- 0 until 50) {
+ consumer.register(new SystemStreamPartition("test-system", "test-stream", new Partition(i)), "0")
+ }
+
+ consumer.start
+
+ assertEquals(1000, consumer.perPartitionFetchThreshold)
+ }
+
+ @Test
+ def testBrokerCreationShouldTriggerStart {
+ val systemName = "test-system"
+ val streamName = "test-stream"
+ val metrics = new KafkaSystemConsumerMetrics
+ // Lie and tell the store that the partition metadata is empty. We can't
+ // use partition metadata because it has Broker in its constructor, which
+ // is package private to Kafka.
+ val metadataStore = new MockMetadataStore(Map(streamName -> TopicMetadata(streamName, Seq.empty, Errors.NONE)))
+ var hosts = List[String]()
+ var getHostPortCount = 0
+ val consumer = new KafkaSystemConsumer(systemName, systemAdmin, metrics, metadataStore, clientId) {
+ override def getLeaderHostPort(partitionMetadata: Option[PartitionMetadata]): Option[(String, Int)] = {
+ // Generate a unique host every time getHostPort is called.
+ getHostPortCount += 1
+ Some("localhost-%s" format getHostPortCount, 0)
+ }
+
+ override def createBrokerProxy(host: String, port: Int): BrokerProxy = {
+ new BrokerProxy(host, port, systemName, "", metrics, sink) {
+ override def addTopicPartition(tp: TopicAndPartition, nextOffset: Option[String]) = {
+ // Skip this since we normally do verification of offsets, which
+ // tries to connect to Kafka. Rather than mock that, just forget it.
+ nextOffsets.size
+ }
+
+ override def start {
+ hosts :+= host
+ }
+ }
+ }
+ }
+
+ consumer.register(new SystemStreamPartition(systemName, streamName, new Partition(0)), "1")
+ assertEquals(0, hosts.size)
+ consumer.start
+ assertEquals(List("localhost-1"), hosts)
+ // Should trigger a refresh with a new host.
+ consumer.sink.abdicate(new TopicAndPartition(streamName, 0), 2)
+ assertEquals(List("localhost-1", "localhost-2"), hosts)
+ }
+
+ @Test
+ def testConsumerRegisterOlderOffsetOfTheSamzaSSP {
+ when(systemAdmin.offsetComparator(anyString, anyString)).thenCallRealMethod()
+
+ val metadataStore = new MockMetadataStore
+ val consumer = new KafkaSystemConsumer("", systemAdmin, new KafkaSystemConsumerMetrics, metadataStore, clientId, fetchThreshold = 50000)
+ val ssp0 = new SystemStreamPartition("test-system", "test-stream", new Partition(0))
+ val ssp1 = new SystemStreamPartition("test-system", "test-stream", new Partition(1))
+ val ssp2 = new SystemStreamPartition("test-system", "test-stream", new Partition(2))
+
+ consumer.register(ssp0, "0")
+ consumer.register(ssp0, "5")
+ consumer.register(ssp1, "2")
+ consumer.register(ssp1, "3")
+ consumer.register(ssp2, "0")
+
+ assertEquals("0", consumer.topicPartitionsAndOffsets(KafkaSystemConsumer.toTopicAndPartition(ssp0)))
+ assertEquals("2", consumer.topicPartitionsAndOffsets(KafkaSystemConsumer.toTopicAndPartition(ssp1)))
+ assertEquals("0", consumer.topicPartitionsAndOffsets(KafkaSystemConsumer.toTopicAndPartition(ssp2)))
+ }
+
+ @Test
+ def testFetchThresholdBytesShouldDivideEvenlyAmongPartitions {
+ val metadataStore = new MockMetadataStore
+ val consumer = new KafkaSystemConsumer("", systemAdmin, new KafkaSystemConsumerMetrics, metadataStore, clientId,
+ fetchThreshold = 50000, fetchThresholdBytes = 60000L, fetchLimitByBytesEnabled = true) {
+ override def refreshBrokers {
+ }
+ }
+
+ for (i <- 0 until 10) {
+ consumer.register(new SystemStreamPartition("test-system", "test-stream", new Partition(i)), "0")
+ }
+
+ consumer.start
+
+ assertEquals(5000, consumer.perPartitionFetchThreshold)
+ assertEquals(3000, consumer.perPartitionFetchThresholdBytes)
+ }
+
+ @Test
+ def testFetchThresholdBytes {
+ val metadataStore = new MockMetadataStore
+ val consumer = new KafkaSystemConsumer("test-system", systemAdmin, new KafkaSystemConsumerMetrics, metadataStore, clientId,
+ fetchThreshold = 50000, fetchThresholdBytes = 60000L, fetchLimitByBytesEnabled = true) {
+ override def refreshBrokers {
+ }
+ }
+
+ for (i <- 0 until 10) {
+ consumer.register(new SystemStreamPartition("test-system", "test-stream", new Partition(i)), "0")
+ }
+
+ consumer.start
+
+ val msg = Array[Byte](5, 112, 9, 126)
+ val msgAndOffset: MessageAndOffset = MessageAndOffset(new Message(msg), 887654)
+ // 4 data + 18 Message overhead + 80 IncomingMessageEnvelope overhead
+ consumer.sink.addMessage(new TopicAndPartition("test-stream", 0), msgAndOffset, 887354)
+
+ assertEquals(106, consumer.getMessagesSizeInQueue(new SystemStreamPartition("test-system", "test-stream", new Partition(0))))
+ }
+
+ @Test
+ def testFetchThresholdBytesDisabled {
+ val metadataStore = new MockMetadataStore
+ val consumer = new KafkaSystemConsumer("", systemAdmin, new KafkaSystemConsumerMetrics, metadataStore, clientId,
+ fetchThreshold = 50000, fetchThresholdBytes = 60000L) {
+ override def refreshBrokers {
+ }
+ }
+
+ for (i <- 0 until 10) {
+ consumer.register(new SystemStreamPartition("test-system", "test-stream", new Partition(i)), "0")
+ }
+
+ consumer.start
+
+ assertEquals(5000, consumer.perPartitionFetchThreshold)
+ assertEquals(0, consumer.perPartitionFetchThresholdBytes)
+ assertEquals(0, consumer.getMessagesSizeInQueue(new SystemStreamPartition("test-system", "test-stream", new Partition(0))))
+ }
+}
+
+class MockMetadataStore(var metadata: Map[String, TopicMetadata] = Map()) extends TopicMetadataStore {
+ def getTopicInfo(topics: Set[String]): Map[String, TopicMetadata] = metadata
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/282f8349/samza-kafka/src/test/scala/org/apache/samza/system/kafka_deprecated/TestKafkaSystemFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka_deprecated/TestKafkaSystemFactory.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka_deprecated/TestKafkaSystemFactory.scala
new file mode 100644
index 0000000..41a48f3
--- /dev/null
+++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka_deprecated/TestKafkaSystemFactory.scala
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kafka_deprecated
+
+import org.apache.samza.SamzaException
+import org.apache.samza.config.MapConfig
+import org.apache.samza.config.StorageConfig
+import org.apache.samza.metrics.MetricsRegistryMap
+import org.junit.Assert._
+import org.junit.Test
+
+import scala.collection.JavaConverters._
+
+class TestKafkaSystemFactory {
+ @Test
+ def testFailWhenNoSerdeDefined {
+ val producerFactory = new KafkaSystemFactory
+ try {
+ producerFactory.getProducer(
+ "test",
+ new MapConfig(Map[String, String]().asJava),
+ new MetricsRegistryMap)
+ fail("Expected to get a Samza exception.")
+ } catch {
+ case e: SamzaException => None // expected
+ case e: Exception => fail("Expected SamzaException, but got " + e)
+ }
+ }
+
+ @Test
+ def testFailWhenSerdeIsInvalid {
+ val producerFactory = new KafkaSystemFactory
+ val config = new MapConfig(Map[String, String](
+ "streams.test.serde" -> "failme").asJava)
+ try {
+ producerFactory.getProducer(
+ "test",
+ config,
+ new MetricsRegistryMap)
+ fail("Expected to get a Samza exception.")
+ } catch {
+ case e: SamzaException => None // expected
+ case e: Exception => fail("Expected SamzaException, but got " + e)
+ }
+ }
+
+ @Test
+ def testHappyPath {
+ val producerFactory = new KafkaSystemFactory
+ val config = new MapConfig(Map[String, String](
+ "job.name" -> "test",
+ "systems.test.producer.bootstrap.servers" -> "",
+ "systems.test.samza.key.serde" -> "json",
+ "systems.test.samza.msg.serde" -> "json",
+ "serializers.registry.json.class" -> "samza.serializers.JsonSerdeFactory").asJava)
+ var producer = producerFactory.getProducer(
+ "test",
+ config,
+ new MetricsRegistryMap)
+ assertNotNull(producer)
+ assertTrue(producer.isInstanceOf[KafkaSystemProducer])
+ producer = producerFactory.getProducer(
+ "test",
+ config,
+ new MetricsRegistryMap)
+ assertNotNull(producer)
+ assertTrue(producer.isInstanceOf[KafkaSystemProducer])
+ }
+
+ @Test
+ def testInjectedProducerProps {
+ val configMap = Map[String, String](
+ StorageConfig.FACTORY.format("system1") -> "some.factory.Class",
+ StorageConfig.CHANGELOG_STREAM.format("system1") -> "system1.stream1",
+ StorageConfig.FACTORY.format("system2") -> "some.factory.Class")
+ val config = new MapConfig(configMap.asJava)
+ assertEquals(Map[String, String](), KafkaSystemFactory.getInjectedProducerProperties("system3", config))
+ assertEquals(Map[String, String](), KafkaSystemFactory.getInjectedProducerProperties("system2", config))
+ assertEquals(Map[String, String]("compression.type" -> "none"), KafkaSystemFactory.getInjectedProducerProperties("system1", config))
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/282f8349/samza-kafka/src/test/scala/org/apache/samza/system/kafka_deprecated/TestKafkaSystemProducer.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka_deprecated/TestKafkaSystemProducer.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka_deprecated/TestKafkaSystemProducer.scala
new file mode 100644
index 0000000..16a1287
--- /dev/null
+++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka_deprecated/TestKafkaSystemProducer.scala
@@ -0,0 +1,604 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kafka_deprecated
+
+import org.apache.kafka.clients.producer._
+import org.apache.kafka.common.errors.{RecordTooLargeException, SerializationException, TimeoutException}
+import org.apache.kafka.test.MockSerializer
+import org.apache.samza.system.kafka.MockKafkaProducer
+import org.apache.samza.system.{OutgoingMessageEnvelope, SystemProducerException, SystemStream}
+import org.junit.Assert._
+import org.junit.Test
+import org.scalatest.Assertions.intercept
+
+
+class TestKafkaSystemProducer {
+ val systemStream = new SystemStream("testSystem", "testStream")
+ val someMessage = new OutgoingMessageEnvelope(systemStream, "test".getBytes)
+
+ @Test
+ def testKafkaProducer {
+ val mockProducer = new MockProducer(true, new MockSerializer, new MockSerializer)
+ val systemProducer = new KafkaSystemProducer(systemName = "test",
+ getProducer = () => mockProducer,
+ metrics = new KafkaSystemProducerMetrics)
+ systemProducer.register("test")
+ systemProducer.start
+ systemProducer.send("test", someMessage)
+ assertEquals(1, systemProducer.producerRef.get().asInstanceOf[MockProducer[Array[Byte], Array[Byte]]].history().size())
+ systemProducer.stop
+ }
+
+ @Test
+ def testKafkaProducerUsingMockKafkaProducer {
+ val mockProducer = new MockKafkaProducer(1, "test", 1)
+ val systemProducer = new KafkaSystemProducer(systemName = "test",
+ getProducer = () => mockProducer,
+ metrics = new KafkaSystemProducerMetrics)
+ systemProducer.register("test")
+ systemProducer.start()
+ systemProducer.send("test", someMessage)
+ assertEquals(1, mockProducer.getMsgsSent)
+ systemProducer.stop()
+ }
+
+ @Test
+ def testKafkaProducerBufferedSend {
+ val msg1 = new OutgoingMessageEnvelope(systemStream, "a".getBytes)
+ val msg2 = new OutgoingMessageEnvelope(systemStream, "b".getBytes)
+ val msg3 = new OutgoingMessageEnvelope(systemStream, "c".getBytes)
+
+ val mockProducer = new MockKafkaProducer(1, "test", 1)
+ val producerMetrics = new KafkaSystemProducerMetrics
+ val systemProducer = new KafkaSystemProducer(systemName = "test",
+ getProducer = () => mockProducer,
+ metrics = producerMetrics)
+ systemProducer.register("test")
+ systemProducer.start()
+ systemProducer.send("test", msg1)
+
+ mockProducer.setShouldBuffer(true)
+ systemProducer.send("test", msg2)
+ systemProducer.send("test", msg3)
+ assertEquals(1, mockProducer.getMsgsSent)
+
+ val sendThread: Thread = mockProducer.startDelayedSendThread(2000)
+ sendThread.join()
+
+ assertEquals(3, mockProducer.getMsgsSent)
+ systemProducer.stop()
+ }
+
+ @Test
+ def testKafkaProducerFlushSuccessful {
+ val msg1 = new OutgoingMessageEnvelope(systemStream, "a".getBytes)
+ val msg2 = new OutgoingMessageEnvelope(systemStream, "b".getBytes)
+ val msg3 = new OutgoingMessageEnvelope(systemStream, "c".getBytes)
+
+ val mockProducer = new MockKafkaProducer(1, "test", 1)
+ val systemProducer = new KafkaSystemProducer(systemName = "test",
+ getProducer = () => mockProducer,
+ metrics = new KafkaSystemProducerMetrics)
+ systemProducer.register("test")
+ systemProducer.start()
+ systemProducer.send("test", msg1)
+
+ mockProducer.setShouldBuffer(true)
+ systemProducer.send("test", msg2)
+ systemProducer.send("test", msg3)
+ assertEquals(1, mockProducer.getMsgsSent)
+ mockProducer.startDelayedSendThread(2000)
+ systemProducer.flush("test")
+ assertEquals(3, mockProducer.getMsgsSent)
+ systemProducer.stop()
+ }
+
+ @Test
+ def testKafkaProducerFlushWithException {
+ val msg1 = new OutgoingMessageEnvelope(systemStream, "a".getBytes)
+ val msg2 = new OutgoingMessageEnvelope(systemStream, "b".getBytes)
+ val msg3 = new OutgoingMessageEnvelope(systemStream, "c".getBytes)
+ val msg4 = new OutgoingMessageEnvelope(systemStream, "d".getBytes)
+
+ val mockProducer = new MockKafkaProducer(1, "test", 1)
+ val systemProducer = new KafkaSystemProducer(systemName = "test",
+ getProducer = () => mockProducer,
+ metrics = new KafkaSystemProducerMetrics())
+ systemProducer.register("test")
+ systemProducer.start()
+ systemProducer.send("test", msg1)
+
+ mockProducer.setShouldBuffer(true)
+ systemProducer.send("test", msg2)
+ mockProducer.setErrorNext(true, true, new RecordTooLargeException())
+ systemProducer.send("test", msg3)
+ systemProducer.send("test", msg4)
+
+ assertEquals(1, mockProducer.getMsgsSent)
+
+ mockProducer.startDelayedSendThread(2000)
+ val thrown = intercept[SystemProducerException] {
+ systemProducer.flush("test")
+ }
+ assertTrue(thrown.isInstanceOf[SystemProducerException])
+ assertEquals(3, mockProducer.getMsgsSent) // msg1, msg2 and msg4 will be sent
+ systemProducer.stop()
+ }
+
+ @Test
+ def testKafkaProducerWithRetriableException {
+ val msg1 = new OutgoingMessageEnvelope(systemStream, "a".getBytes)
+ val msg2 = new OutgoingMessageEnvelope(systemStream, "b".getBytes)
+ val msg3 = new OutgoingMessageEnvelope(systemStream, "c".getBytes)
+ val msg4 = new OutgoingMessageEnvelope(systemStream, "d".getBytes)
+
+ val mockProducer = new MockKafkaProducer(1, "test", 1)
+ val producerMetrics = new KafkaSystemProducerMetrics()
+ val producer = new KafkaSystemProducer(systemName = "test",
+ getProducer = () => mockProducer,
+ metrics = producerMetrics)
+
+ producer.register("test")
+ producer.start()
+ producer.send("test", msg1)
+ producer.send("test", msg2)
+ producer.send("test", msg3)
+ producer.flush("test")
+
+ mockProducer.setErrorNext(true, true, new TimeoutException())
+
+ producer.send("test", msg4)
+ val thrown = intercept[SystemProducerException] {
+ producer.flush("test")
+ }
+ assertTrue(thrown.isInstanceOf[SystemProducerException])
+ assertTrue(thrown.getCause.getCause.isInstanceOf[TimeoutException])
+ assertEquals(3, mockProducer.getMsgsSent)
+ producer.stop()
+ }
+
+ /**
+ * If there's an exception, we should:
+ * 1. Close the producer (from the one-and-only kafka send thread) to prevent subsequent sends from going out of order.
+ * 2. Record the original exception
+ * 3. Throw the exception every time a KafkaSystemProducer method is invoked until the container fails.
+ *
+ * Assumptions:
+ * 1. SystemProducer.flush() can happen concurrently with SystemProducer.send() for a particular TaskInstance (task.async.commit)
+ * 2. SystemProducer.flush() cannot happen concurrently with itself for a particular task instance
+ * 3. Any exception thrown from SystemProducer.flush() will prevent the checkpointing and fail the container
+ * 4. A single KafkaProducer is shared by all the tasks so any failure from one task can affect the others.
+ *
+ * Conclusions:
+ * It is only safe to handle the async exceptions from by closing the producer and failing the container.
+ * This prevents race conditons with setting/clearing exceptions and recreating the producer that could cause data
+ * loss by checkpointing a failed offset.
+ *
+ * Inaccuracies:
+ * A real kafka producer succeeds or fails all the messages in a batch. In other words, the messages of a batch all
+ * fail or they all succeed together. This test, however, fails individual callbacks in order to test boundary
+ * conditions where the batches align perfectly around the failed send().
+ */
+ @Test
+ def testKafkaProducerWithFatalExceptions {
+ val msg1 = new OutgoingMessageEnvelope(systemStream, "a".getBytes)
+ val msg2 = new OutgoingMessageEnvelope(systemStream, "b".getBytes)
+ val msg3 = new OutgoingMessageEnvelope(systemStream, "c".getBytes)
+ val msg4 = new OutgoingMessageEnvelope(systemStream, "d".getBytes)
+ val msg5 = new OutgoingMessageEnvelope(systemStream, "e".getBytes)
+ val producerMetrics = new KafkaSystemProducerMetrics()
+
+ val mockProducer = new MockKafkaProducer(1, "test", 1)
+ val producer = new KafkaSystemProducer(systemName = "test",
+ getProducer = () => {
+ mockProducer.open() // A new producer would not already be closed, so reset it.
+ mockProducer
+ },
+ metrics = producerMetrics)
+ producer.register("test")
+ producer.start()
+
+ producer.send("test", msg1)
+ producer.send("test", msg2)
+ mockProducer.setErrorNext(true, true, new RecordTooLargeException())
+ producer.send("test", msg3) // Callback exception
+ assertTrue(mockProducer.isClosed)
+ assertNotNull(producer.producerRef.get())
+ assertEquals("Should NOT have created a new producer", 1, mockProducer.getOpenCount)
+
+ val senderException = intercept[SystemProducerException] {
+ producer.send("test", msg4) // Should fail because the producer is closed.
+ }
+ assertTrue(senderException.getCause.getCause.isInstanceOf[RecordTooLargeException])
+
+ val callbackException = intercept[SystemProducerException] {
+ producer.flush("test") // Should throw the callback exception
+ }
+ assertTrue(callbackException.getCause.getCause.isInstanceOf[RecordTooLargeException])
+
+ val postFlushException = intercept[SystemProducerException] {
+ producer.send("test", msg5) // Should not be able to send again after flush
+ }
+ assertTrue(postFlushException.getCause.getCause.isInstanceOf[RecordTooLargeException])
+
+ val callbackException2 = intercept[SystemProducerException] {
+ producer.flush("test") // Should rethrow the exception
+ }
+ assertTrue(callbackException2.getCause.getCause.isInstanceOf[RecordTooLargeException])
+ assertEquals(2, mockProducer.getMsgsSent) // only the messages before the error get sent
+ producer.stop()
+ }
+
+ /**
+ * Recapping from [[testKafkaProducerWithFatalExceptions]]:
+ *
+ * If there's an exception, we should:
+ * 1. Close the producer (from the one-and-only kafka send thread) to prevent subsequent sends from going out of order.
+ * 2. Record the original exception
+ * 3. Throw the exception every time a KafkaSystemProducer method is invoked until the container fails.
+ *
+ * This test focuses on point 3. Particularly it ensures that the failures are handled properly across multiple sources
+ * which share the same producer.
+ */
+ @Test
+ def testKafkaProducerWithFatalExceptionsMultipleSources {
+ val msg1 = new OutgoingMessageEnvelope(systemStream, "a".getBytes)
+ val msg2 = new OutgoingMessageEnvelope(systemStream, "b".getBytes)
+ val msg3 = new OutgoingMessageEnvelope(systemStream, "c".getBytes)
+ val msg4 = new OutgoingMessageEnvelope(systemStream, "d".getBytes)
+ val msg5 = new OutgoingMessageEnvelope(systemStream, "e".getBytes)
+ val msg6 = new OutgoingMessageEnvelope(systemStream, "f".getBytes)
+ val msg7 = new OutgoingMessageEnvelope(systemStream, "g".getBytes)
+ val producerMetrics = new KafkaSystemProducerMetrics()
+
+ val mockProducer = new MockKafkaProducer(1, "test", 1)
+ val producer = new KafkaSystemProducer(systemName = "test",
+ getProducer = () => {
+ mockProducer.open() // A new producer would not already be closed, so reset it.
+ mockProducer
+ },
+ metrics = producerMetrics)
+ producer.register("test1")
+ producer.register("test2")
+
+ producer.start()
+
+ // Initial sends
+ producer.send("test1", msg1)
+ producer.send("test2", msg2)
+
+ // Inject error for next send
+ mockProducer.setErrorNext(true, true, new RecordTooLargeException())
+ producer.send("test1", msg3) // Callback exception
+ assertTrue(mockProducer.isClosed)
+ assertNotNull(producer.producerRef.get())
+ assertEquals("Should NOT have created a new producer", 1, mockProducer.getOpenCount)
+
+ // Subsequent sends
+ val senderException = intercept[SystemProducerException] {
+ producer.send("test1", msg4) // Should fail because the producer is closed.
+ }
+ assertTrue(senderException.getCause.getCause.isInstanceOf[RecordTooLargeException])
+
+ val callbackException = intercept[SystemProducerException] {
+ producer.send("test2", msg4) // First send from separate source gets a producer closed exception
+ }
+ assertTrue(callbackException.getCause.getCause.isInstanceOf[RecordTooLargeException])
+
+ val callbackException2 = intercept[SystemProducerException] {
+ producer.send("test2", msg5) // Second send should still get the error
+ }
+ assertTrue(callbackException2.getCause.getCause.isInstanceOf[RecordTooLargeException])
+
+ // Flushes
+ val callbackException3 = intercept[SystemProducerException] {
+ producer.flush("test2") // Should rethrow the closed exception in flush
+ }
+ assertTrue(callbackException3.isInstanceOf[SystemProducerException])
+ assertTrue(callbackException3.getCause.getCause.isInstanceOf[RecordTooLargeException])
+ intercept[SystemProducerException] {
+ producer.send("test2", msg6) // Should still not be able to send after flush
+ }
+
+ val thrown3 = intercept[SystemProducerException] {
+ producer.flush("test1") // Should throw the callback exception
+ }
+ assertTrue(thrown3.isInstanceOf[SystemProducerException])
+ assertTrue(thrown3.getCause.getCause.isInstanceOf[RecordTooLargeException])
+
+ intercept[SystemProducerException] {
+ producer.send("test1", msg7) // Should still not be able to send after flush
+ }
+
+ intercept[SystemProducerException] {
+ producer.flush("test1") // Should throw the callback exception
+ }
+ assertEquals(2, mockProducer.getMsgsSent)
+ producer.stop()
+ }
+
+ @Test
+ def testKafkaProducerWithNonFatalExceptionsMultipleSources {
+ val msg1 = new OutgoingMessageEnvelope(systemStream, "a".getBytes)
+ val msg2 = new OutgoingMessageEnvelope(systemStream, "b".getBytes)
+ val msg3 = new OutgoingMessageEnvelope(systemStream, "c".getBytes)
+ val msg4 = new OutgoingMessageEnvelope(systemStream, "d".getBytes)
+ val msg5 = new OutgoingMessageEnvelope(systemStream, "e".getBytes)
+ val producerMetrics = new KafkaSystemProducerMetrics()
+
+ val mockProducer = new MockKafkaProducer(1, "test", 1)
+ val producer = new KafkaSystemProducer(systemName = "test",
+ getProducer = () => {
+ mockProducer.open() // A new producer would not already be closed, so reset it.
+ mockProducer
+ },
+ metrics = producerMetrics)
+ producer.register("test1")
+ producer.register("test2")
+ producer.start()
+
+ producer.send("test1", msg1)
+ producer.send("test2", msg2)
+ mockProducer.setErrorNext(true, false, new SerializationException())
+ val sendException = intercept[SystemProducerException] {
+ producer.send("test1", msg3) // User-thread exception
+ }
+ assertTrue(sendException.getCause.isInstanceOf[SerializationException])
+ assertFalse(mockProducer.isClosed)
+ assertNotNull(producer.producerRef.get())
+ assertEquals("Should NOT have created a new producer", 1, mockProducer.getOpenCount)
+
+ producer.send("test1", msg3) // Should be able to resend msg3
+ producer.send("test2", msg4) // Second source should not be affected
+
+ producer.flush("test1") // Flush should be unaffected
+
+ producer.send("test1", msg5) // Should be able to send again after flush
+
+ assertEquals(5, mockProducer.getMsgsSent) // only the messages before the error get sent
+ producer.stop()
+ }
+
+ /**
+ * If there's an exception and the user configured task.drop.producer.errors, we should:
+ * 1. Close the producer (from the one-and-only kafka send thread) to prevent subsequent sends from going out of order.
+ * 2. Recreate the producer.
+ * 3. Ignore any messages that were dropped (user knows they're signing up for this if they enable the option)
+ *
+ * Assumptions:
+ * 1. SystemProducer.flush() can happen concurrently with SystemProducer.send() for a particular TaskInstance (task.async.commit)
+ * 2. SystemProducer.flush() cannot happen concurrently with itself for a particular task instance
+ * 3. Any exception thrown from SystemProducer.flush() will prevent the checkpointing and fail the container
+ * 4. A single KafkaProducer is shared by all the tasks so any failure from one task can affect the others.
+ *
+ * Conclusions:
+ * If the user is ok with dropping messages for the sake of availability, we will swallow all exceptions and
+ * recreate the producer to recover. There are no guarantees how many messages are lost, but the send-failed metric
+ * should be accurate, so users should alert on that.
+ *
+ * Inaccuracies:
+ * A real kafka producer succeeds or fails all the messages in a batch. In other words, the messages of a batch all
+ * fail or they all succeed together. This test, however, fails individual callbacks in order to test boundary
+ * conditions where the batches align perfectly around the failed send().
+ */
+ @Test
+ def testKafkaProducerWithFatalExceptionsDroppingExceptions {
+ val msg1 = new OutgoingMessageEnvelope(systemStream, "a".getBytes)
+ val msg2 = new OutgoingMessageEnvelope(systemStream, "b".getBytes)
+ val msg3 = new OutgoingMessageEnvelope(systemStream, "c".getBytes)
+ val msg4 = new OutgoingMessageEnvelope(systemStream, "d".getBytes)
+ val msg5 = new OutgoingMessageEnvelope(systemStream, "e".getBytes)
+ val producerMetrics = new KafkaSystemProducerMetrics()
+
+ val mockProducer = new MockKafkaProducer(1, "test", 1)
+ val producer = new KafkaSystemProducer(systemName = "test",
+ getProducer = () => {
+ mockProducer.open() // A new producer would not already be closed, so reset it.
+ mockProducer
+ },
+ metrics = producerMetrics,
+ dropProducerExceptions = true) // Here's where we enable exception dropping.
+ producer.register("test")
+ producer.start()
+
+ producer.send("test", msg1)
+ producer.send("test", msg2)
+ mockProducer.setErrorNext(true, true, new RecordTooLargeException())
+ producer.send("test", msg3) // Callback exception
+ assertTrue(mockProducer.isClosed)
+ assertNull(producer.producerRef.get())
+ assertEquals("Should not have created a new producer", 1, mockProducer.getOpenCount)
+
+ producer.send("test", msg4) // Should succeed because the producer recovered.
+ assertFalse(mockProducer.isClosed)
+ assertNotNull(producer.producerRef.get())
+ assertEquals("Should have created a new producer", 2, mockProducer.getOpenCount)
+ producer.flush("test") // Should not throw
+
+ producer.send("test", msg5) // Should be able to send again after flush
+ assertEquals("Should not have created a new producer", 2, mockProducer.getOpenCount)
+ producer.flush("test")
+
+ assertEquals(4, mockProducer.getMsgsSent) // every message except the one with the error should get sent
+ producer.stop()
+ }
+
+ /**
+ * Recapping from [[testKafkaProducerWithFatalExceptionsDroppingExceptions]]:
+ *
+ * If there's an exception, we should:
+ * 1. Close the producer (from the one-and-only kafka send thread) to prevent subsequent sends from going out of order.
+ * 2. Recreate the producer.
+ * 3. Ignore any messages that were dropped (user knows they're signing up for this if they enable the option)
+ *
+ * This test ensures that the failures are handled properly across multiple sources
+ * which share the same producer.
+ */
+ @Test
+ def testKafkaProducerWithFatalExceptionsMultipleSourcesDroppingExceptions {
+ val msg1 = new OutgoingMessageEnvelope(systemStream, "a".getBytes)
+ val msg2 = new OutgoingMessageEnvelope(systemStream, "b".getBytes)
+ val msg3 = new OutgoingMessageEnvelope(systemStream, "c".getBytes)
+ val msg4 = new OutgoingMessageEnvelope(systemStream, "d".getBytes)
+ val msg5 = new OutgoingMessageEnvelope(systemStream, "e".getBytes)
+ val msg6 = new OutgoingMessageEnvelope(systemStream, "f".getBytes)
+ val msg7 = new OutgoingMessageEnvelope(systemStream, "g".getBytes)
+ val producerMetrics = new KafkaSystemProducerMetrics()
+
+ val mockProducer = new MockKafkaProducer(1, "test", 1)
+ val producer = new KafkaSystemProducer(systemName = "test",
+ getProducer = () => {
+ mockProducer.open() // A new producer would not already be closed, so reset it.
+ mockProducer
+ },
+ metrics = producerMetrics,
+ dropProducerExceptions = true) // Here's where we enable exception dropping.
+ producer.register("test1")
+ producer.register("test2")
+
+ producer.start()
+
+ // Initial sends
+ producer.send("test1", msg1)
+ producer.send("test2", msg2)
+
+ // Inject error for next send
+ mockProducer.setErrorNext(true, true, new RecordTooLargeException())
+ producer.send("test1", msg3) // Callback exception
+ assertTrue(mockProducer.isClosed)
+ assertNull(producer.producerRef.get())
+ assertEquals("Should not have created a new producer", 1, mockProducer.getOpenCount)
+
+ // Subsequent sends
+ producer.send("test1", msg4) // Should succeed because the producer recovered.
+ assertFalse(mockProducer.isClosed)
+ assertEquals("Should have created a new producer", 2, mockProducer.getOpenCount)
+ assertNotNull(producer.producerRef.get())
+ producer.send("test2", msg5) // Second source should also not have any error.
+ assertEquals("Should not have created a new producer", 2, mockProducer.getOpenCount)
+
+ // Flushes
+ producer.flush("test2") // Should not throw for test2
+ producer.send("test2", msg6) // Should still work after flush
+
+ producer.flush("test1") // Should not throw for test1 either
+ producer.send("test1", msg7)
+
+ assertEquals(6, mockProducer.getMsgsSent) // every message except the one with the error should get sent
+ producer.stop()
+ }
+
+ @Test
+ def testKafkaProducerWithNonFatalExceptionsMultipleSourcesDroppingExceptions {
+ val msg1 = new OutgoingMessageEnvelope(systemStream, "a".getBytes)
+ val msg2 = new OutgoingMessageEnvelope(systemStream, "b".getBytes)
+ val msg3 = new OutgoingMessageEnvelope(systemStream, "c".getBytes)
+ val msg4 = new OutgoingMessageEnvelope(systemStream, "d".getBytes)
+ val msg5 = new OutgoingMessageEnvelope(systemStream, "e".getBytes)
+ val producerMetrics = new KafkaSystemProducerMetrics()
+
+ val mockProducer = new MockKafkaProducer(1, "test", 1)
+ val producer = new KafkaSystemProducer(systemName = "test",
+ getProducer = () => {
+ mockProducer.open() // A new producer would not already be closed, so reset it.
+ mockProducer
+ },
+ metrics = producerMetrics,
+ dropProducerExceptions = true) // Here's where we enable exception dropping.
+ producer.register("test1")
+ producer.register("test2")
+ producer.start()
+
+ producer.send("test1", msg1)
+ producer.send("test2", msg2)
+ mockProducer.setErrorNext(true, false, new SerializationException())
+ val sendException = intercept[SystemProducerException] {
+ producer.send("test1", msg3) // User-thread exception
+ }
+ assertTrue(sendException.getCause.isInstanceOf[SerializationException])
+ assertFalse(mockProducer.isClosed)
+ assertNotNull(producer.producerRef.get()) // Synchronous error; producer should not be recreated
+ assertEquals("Should NOT have created a new producer", 1, mockProducer.getOpenCount)
+
+ producer.send("test1", msg3) // Should be able to resend msg3
+ assertFalse(mockProducer.isClosed)
+ assertEquals("Should NOT have created a new producer", 1, mockProducer.getOpenCount)
+ assertNotNull(producer.producerRef.get())
+ producer.send("test2", msg4) // Second source should not be affected
+
+ producer.flush("test1") // Flush should be unaffected
+
+ producer.send("test1", msg5) // Should be able to send again after flush
+
+ assertEquals(5, mockProducer.getMsgsSent) // only the messages before the error get sent
+ producer.stop()
+ }
+
+ @Test
+ def testKafkaProducerFlushMsgsWhenStop {
+ val msg1 = new OutgoingMessageEnvelope(systemStream, "a".getBytes)
+ val msg2 = new OutgoingMessageEnvelope(systemStream, "b".getBytes)
+ val msg3 = new OutgoingMessageEnvelope(systemStream, "c".getBytes)
+ val msg4 = new OutgoingMessageEnvelope(new SystemStream("test2", "test"), "d".getBytes)
+
+ val mockProducer = new MockKafkaProducer(1, "test", 1)
+ val systemProducer = new KafkaSystemProducer(systemName = "test",
+ getProducer = () => mockProducer,
+ metrics = new KafkaSystemProducerMetrics)
+ systemProducer.register("test")
+ systemProducer.register("test2")
+ systemProducer.start()
+ systemProducer.send("test", msg1)
+
+ mockProducer.setShouldBuffer(true)
+ systemProducer.send("test", msg2)
+ systemProducer.send("test", msg3)
+ systemProducer.send("test2", msg4)
+ assertEquals(1, mockProducer.getMsgsSent)
+ mockProducer.startDelayedSendThread(2000)
+ assertEquals(1, mockProducer.getMsgsSent)
+ systemProducer.stop()
+ assertEquals(4, mockProducer.getMsgsSent)
+ }
+
+ @Test
+ def testSystemStreamNameNullOrEmpty {
+ val omeStreamNameNull = new OutgoingMessageEnvelope(new SystemStream("test", null), "a".getBytes)
+ val omeStreamNameEmpty = new OutgoingMessageEnvelope(new SystemStream("test", ""), "a".getBytes)
+ val mockProducer = new MockKafkaProducer(1, "testMock", 1)
+ val producer = new KafkaSystemProducer(systemName = "test", getProducer = () => mockProducer,
+ metrics = new KafkaSystemProducerMetrics)
+
+ val thrownNull = intercept[IllegalArgumentException] {
+ producer.register("test1")
+ producer.start()
+ producer.send("testSrc1", omeStreamNameNull)
+ assertEquals(0, mockProducer.getMsgsSent)
+ }
+ val thrownEmpty = intercept[IllegalArgumentException] {
+ producer.register("test2")
+ producer.start()
+ producer.send("testSrc2", omeStreamNameEmpty)
+ assertEquals(0, mockProducer.getMsgsSent)
+ }
+ assertTrue(thrownNull.getMessage() == "Invalid system stream: " + omeStreamNameNull.getSystemStream)
+ assertTrue(thrownEmpty.getMessage() == "Invalid system stream: " + omeStreamNameEmpty.getSystemStream)
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/282f8349/samza-kafka/src/test/scala/org/apache/samza/system/kafka_deprecated/TestTopicMetadataCache.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka_deprecated/TestTopicMetadataCache.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka_deprecated/TestTopicMetadataCache.scala
new file mode 100644
index 0000000..dda011b
--- /dev/null
+++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka_deprecated/TestTopicMetadataCache.scala
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kafka_deprecated
+
+import java.util.concurrent.CountDownLatch
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
+
+import kafka.api.TopicMetadata
+import org.I0Itec.zkclient.ZkClient
+import org.apache.samza.util.Clock
+import org.apache.samza.util.TopicMetadataStore
+import org.junit.Assert._
+import org.junit.Before
+import org.junit.Test
+import kafka.common.ErrorMapping
+import kafka.api.PartitionMetadata
+import org.apache.kafka.common.protocol.Errors
+
+class TestTopicMetadataCache {
+
+ class MockTime extends Clock {
+ var currentValue = 0
+
+ def currentTimeMillis: Long = currentValue
+ }
+
+ class MockTopicMetadataStore extends TopicMetadataStore {
+ var mockCache = Map(
+ "topic1" -> new TopicMetadata("topic1", List.empty, Errors.NONE),
+ "topic2" -> new TopicMetadata("topic2", List.empty, Errors.NONE))
+ var numberOfCalls: AtomicInteger = new AtomicInteger(0)
+
+ def getTopicInfo(topics: Set[String]) = {
+ var topicMetadata = Map[String, TopicMetadata]()
+ topics.foreach(topic => topicMetadata += topic -> mockCache(topic))
+ numberOfCalls.getAndIncrement
+ topicMetadata
+ }
+
+ def setErrorCode(topic: String, errorCode: Short) {
+ mockCache += topic -> new TopicMetadata(topic, List.empty, Errors.forCode(errorCode))
+ }
+ }
+
+ @Before def setup {
+ TopicMetadataCache.clear
+ }
+
+ @Test
+ def testBasicMetadataCacheFunctionality {
+ val mockStore = new MockTopicMetadataStore
+ val mockTime = new MockTime
+
+ // Retrieve a topic from the cache. Initially cache is empty and store is queried to get the data
+ mockStore.setErrorCode("topic1", 3)
+ var metadata = TopicMetadataCache.getTopicMetadata(Set("topic1"), "kafka", mockStore.getTopicInfo, 5, mockTime.currentTimeMillis)
+ assertEquals("topic1", metadata("topic1").topic)
+ assertEquals(3, metadata("topic1").error.code)
+ assertEquals(1, mockStore.numberOfCalls.get())
+
+ // Retrieve the same topic from the cache which has an error code. Ensure the store is called to refresh the cache
+ mockTime.currentValue = 5
+ mockStore.setErrorCode("topic1", 0)
+ metadata = TopicMetadataCache.getTopicMetadata(Set("topic1"), "kafka", mockStore.getTopicInfo, 5, mockTime.currentTimeMillis)
+ assertEquals("topic1", metadata("topic1").topic)
+ assertEquals(0, metadata("topic1").error.code)
+ assertEquals(2, mockStore.numberOfCalls.get())
+
+ // Retrieve the same topic from the cache with refresh rate greater than the last update. Ensure the store is not
+ // called
+ metadata = TopicMetadataCache.getTopicMetadata(Set("topic1"), "kafka", mockStore.getTopicInfo, 5, mockTime.currentTimeMillis)
+ assertEquals("topic1", metadata("topic1").topic)
+ assertEquals(0, metadata("topic1").error.code)
+ assertEquals(2, mockStore.numberOfCalls.get())
+
+ // Ensure that refresh happens when refresh rate is less than the last update. Ensure the store is called
+ mockTime.currentValue = 11
+ metadata = TopicMetadataCache.getTopicMetadata(Set("topic1"), "kafka", mockStore.getTopicInfo, 5, mockTime.currentTimeMillis)
+ assertEquals("topic1", metadata("topic1").topic)
+ assertEquals(0, metadata("topic1").error.code)
+ assertEquals(3, mockStore.numberOfCalls.get())
+ }
+
+ @Test
+ def testMultiThreadedInteractionForTopicMetadataCache {
+ val mockStore = new MockTopicMetadataStore
+ val mockTime = new MockTime
+ val waitForThreadStart = new CountDownLatch(3)
+ val numAssertionSuccess = new AtomicBoolean(true)
+ // Add topic to the cache from multiple threads and ensure the store is called only once
+ val threads = new Array[Thread](3)
+
+ mockTime.currentValue = 17
+ for (i <- 0 until 3) {
+ threads(i) = new Thread(new Runnable {
+ def run {
+ waitForThreadStart.countDown()
+ waitForThreadStart.await()
+ val metadata = TopicMetadataCache.getTopicMetadata(Set("topic1"), "kafka", mockStore.getTopicInfo, 5, mockTime.currentTimeMillis)
+ numAssertionSuccess.compareAndSet(true, metadata("topic1").topic.equals("topic1"))
+ numAssertionSuccess.compareAndSet(true, metadata("topic1").error.code == 0)
+ }
+ })
+ threads(i).start()
+ }
+ for (i <- 0 until 3) {
+ threads(i).join
+ }
+ assertTrue(numAssertionSuccess.get())
+ assertEquals(1, mockStore.numberOfCalls.get())
+ }
+
+ @Test
+ def testBadErrorCodes {
+ val partitionMetadataBad = new PartitionMetadata(0, None, Seq(), error = Errors.LEADER_NOT_AVAILABLE)
+ val partitionMetadataGood = new PartitionMetadata(0, None, Seq(), error = Errors.NONE)
+ assertTrue(TopicMetadataCache.hasBadErrorCode(new TopicMetadata("test", List.empty, Errors.REQUEST_TIMED_OUT)))
+ assertTrue(TopicMetadataCache.hasBadErrorCode(new TopicMetadata("test", List(partitionMetadataBad), Errors.NONE)))
+ assertFalse(TopicMetadataCache.hasBadErrorCode(new TopicMetadata("test", List.empty, Errors.NONE)))
+ assertFalse(TopicMetadataCache.hasBadErrorCode(new TopicMetadata("test", List(partitionMetadataGood), Errors.NONE)))
+ }
+}