You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2015/04/08 01:25:43 UTC
[11/36] samza git commit: SAMZA-592;
ignore replica not available exceptions, and be more aggressive about
refreshing kafka topic metadata
SAMZA-592; ignore replica not available exceptions, and be more aggressive about refreshing kafka topic metadata
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/1202df41
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/1202df41
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/1202df41
Branch: refs/heads/samza-sql
Commit: 1202df4171f44a877997301218fe3c3037abf2d5
Parents: 23c4b39
Author: Chris Riccomini <cr...@apache.org>
Authored: Fri Mar 13 14:52:03 2015 -0700
Committer: Chris Riccomini <cr...@apache.org>
Committed: Fri Mar 13 14:52:03 2015 -0700
----------------------------------------------------------------------
.../kafka/KafkaCheckpointManager.scala | 7 ++--
.../apache/samza/system/kafka/BrokerProxy.scala | 3 +-
.../apache/samza/system/kafka/GetOffset.scala | 7 ++--
.../samza/system/kafka/KafkaSystemAdmin.scala | 10 ++---
.../samza/system/kafka/TopicMetadataCache.scala | 12 +++++-
.../scala/org/apache/samza/util/KafkaUtil.scala | 39 ++++++++++++++++----
.../system/kafka/TestKafkaSystemAdmin.scala | 8 ++--
.../system/kafka/TestTopicMetadataCache.scala | 14 ++++++-
.../org/apache/samza/utils/TestKafkaUtil.scala | 33 +++++++++++++++++
.../test/integration/TestStatefulTask.scala | 8 +---
10 files changed, 107 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/1202df41/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
index 4a1b31f..c9504ec 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
@@ -44,6 +44,7 @@ import org.apache.samza.util.TopicMetadataStore
import scala.collection.mutable
import java.util.Properties
import org.apache.kafka.clients.producer.{Producer, ProducerRecord}
+import org.apache.samza.util.KafkaUtil
/**
* Kafka checkpoint manager is used to store checkpoints in a Kafka topic.
@@ -159,7 +160,7 @@ class KafkaCheckpointManager(
.get(topicAndPartition)
.getOrElse(throw new KafkaCheckpointException("Unable to find offset information for %s:0" format checkpointTopic))
// Fail or retry if there was an an issue with the offset request.
- ErrorMapping.maybeThrowException(offsetResponse.error)
+ KafkaUtil.maybeThrowException(offsetResponse.error)
val offset: Long = offsetResponse
.offsets
@@ -287,7 +288,7 @@ class KafkaCheckpointManager(
warn("Got an offset out of range exception while getting last entry in %s for topic %s and partition 0, so returning a null offset to the KafkaConsumer. Let it decide what to do based on its autooffset.reset setting." format (entryType, checkpointTopic))
return
}
- ErrorMapping.maybeThrowException(errorCode)
+ KafkaUtil.maybeThrowException(errorCode)
}
for (response <- fetchResponse.messageSet(checkpointTopic, 0)) {
@@ -385,7 +386,7 @@ class KafkaCheckpointManager(
loop => {
val topicMetadataMap = TopicMetadataCache.getTopicMetadata(Set(checkpointTopic), systemName, metadataStore.getTopicInfo)
val topicMetadata = topicMetadataMap(checkpointTopic)
- ErrorMapping.maybeThrowException(topicMetadata.errorCode)
+ KafkaUtil.maybeThrowException(topicMetadata.errorCode)
val partitionCount = topicMetadata.partitionsMetadata.length
if (partitionCount != 1) {
http://git-wip-us.apache.org/repos/asf/samza/blob/1202df41/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
index c6e231a..614f33f 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
@@ -34,6 +34,7 @@ import org.apache.samza.util.ThreadNamePrefix.SAMZA_THREAD_NAME_PREFIX
import scala.collection.JavaConversions._
import scala.collection.concurrent
import scala.collection.mutable
+import org.apache.samza.util.KafkaUtil
/**
* Companion object for class JvmMetrics encapsulating various constants
@@ -235,7 +236,7 @@ class BrokerProxy(
// handle the recoverable errors.
remainingErrors.foreach(e => {
warn("Got non-recoverable error codes during multifetch. Throwing an exception to trigger reconnect. Errors: %s" format remainingErrors.mkString(","))
- ErrorMapping.maybeThrowException(e.code) })
+ KafkaUtil.maybeThrowException(e.code) })
notLeaderOrUnknownTopic.foreach(e => abdicate(e.tp))
http://git-wip-us.apache.org/repos/asf/samza/blob/1202df41/samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala
index 147aabc..5528702 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala
@@ -21,12 +21,13 @@
package org.apache.samza.system.kafka
-import kafka.common.{ OffsetOutOfRangeException, ErrorMapping }
+import kafka.common.OffsetOutOfRangeException
import kafka.api._
import kafka.common.TopicAndPartition
import kafka.api.PartitionOffsetRequestInfo
import org.apache.samza.util.Logging
import kafka.message.MessageAndOffset
+import org.apache.samza.util.KafkaUtil
/**
* GetOffset validates offsets for topic partitions, and manages fetching new
@@ -59,7 +60,7 @@ class GetOffset(
val messages = consumer.defaultFetch((topicAndPartition, offset.toLong))
if (messages.hasError) {
- ErrorMapping.maybeThrowException(messages.errorCode(topicAndPartition.topic, topicAndPartition.partition))
+ KafkaUtil.maybeThrowException(messages.errorCode(topicAndPartition.topic, topicAndPartition.partition))
}
info("Able to successfully read from offset %s for topic and partition %s. Using it to instantiate consumer." format (offset, topicAndPartition))
@@ -85,7 +86,7 @@ class GetOffset(
.get(topicAndPartition)
.getOrElse(toss("Unable to find offset information for %s" format topicAndPartition))
- ErrorMapping.maybeThrowException(partitionOffsetResponse.error)
+ KafkaUtil.maybeThrowException(partitionOffsetResponse.error)
partitionOffsetResponse
.offsets
http://git-wip-us.apache.org/repos/asf/samza/blob/1202df41/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
index b790be1..f783c57 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
@@ -28,12 +28,13 @@ import org.apache.samza.system.SystemStreamPartition
import org.apache.samza.util.{ClientUtilTopicMetadataStore, ExponentialSleepStrategy, Logging}
import kafka.api._
import kafka.consumer.SimpleConsumer
-import kafka.common.{TopicExistsException, TopicAndPartition, ErrorMapping}
+import kafka.common.{TopicExistsException, TopicAndPartition}
import java.util.{Properties, UUID}
import scala.collection.JavaConversions._
import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
import kafka.consumer.ConsumerConfig
import kafka.admin.AdminUtils
+import org.apache.samza.util.KafkaUtil
object KafkaSystemAdmin extends Logging {
/**
@@ -222,12 +223,11 @@ class KafkaSystemAdmin(
.values
// Convert the topic metadata to a Seq[(Broker, TopicAndPartition)]
.flatMap(topicMetadata => {
- ErrorMapping.maybeThrowException(topicMetadata.errorCode)
+ KafkaUtil.maybeThrowException(topicMetadata.errorCode)
topicMetadata
.partitionsMetadata
// Convert Seq[PartitionMetadata] to Seq[(Broker, TopicAndPartition)]
.map(partitionMetadata => {
- ErrorMapping.maybeThrowException(partitionMetadata.errorCode)
val topicAndPartition = new TopicAndPartition(topicMetadata.topic, partitionMetadata.partitionId)
val leader = partitionMetadata
.leader
@@ -263,7 +263,7 @@ class KafkaSystemAdmin(
.getOffsetsBefore(new OffsetRequest(partitionOffsetInfo))
.partitionErrorAndOffsets
.mapValues(partitionErrorAndOffset => {
- ErrorMapping.maybeThrowException(partitionErrorAndOffset.error)
+ KafkaUtil.maybeThrowException(partitionErrorAndOffset.error)
partitionErrorAndOffset.offsets.head
})
@@ -320,7 +320,7 @@ class KafkaSystemAdmin(
val metadataStore = new ClientUtilTopicMetadataStore(brokerListString, clientId, timeout)
val topicMetadataMap = TopicMetadataCache.getTopicMetadata(Set(topicName), systemName, metadataStore.getTopicInfo)
val topicMetadata = topicMetadataMap(topicName)
- ErrorMapping.maybeThrowException(topicMetadata.errorCode)
+ KafkaUtil.maybeThrowException(topicMetadata.errorCode)
val partitionCount = topicMetadata.partitionsMetadata.length
if (partitionCount < numKafkaChangelogPartitions) {
http://git-wip-us.apache.org/repos/asf/samza/blob/1202df41/samza-kafka/src/main/scala/org/apache/samza/system/kafka/TopicMetadataCache.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/TopicMetadataCache.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/TopicMetadataCache.scala
index 4a49d22..82ecf1a 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/TopicMetadataCache.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/TopicMetadataCache.scala
@@ -21,7 +21,7 @@ package org.apache.samza.system.kafka
import org.apache.samza.util.Logging
import kafka.api.TopicMetadata
-import kafka.common.ErrorMapping
+import org.apache.samza.util.KafkaUtil
/**
* TopicMetadataCache is used to cache all the topic metadata for Kafka per
@@ -43,7 +43,7 @@ object TopicMetadataCache extends Logging {
val missingTopics = topics.filter(topic => !topicMetadataMap.contains(systemName, topic))
val topicsWithBadOrExpiredMetadata = (topics -- missingTopics).filter(topic => {
val metadata = topicMetadataMap(systemName, topic)
- metadata.streamMetadata.errorCode != ErrorMapping.NoError || ((time - metadata.lastRefreshMs) > cacheTimeout)
+ hasBadErrorCode(metadata.streamMetadata) || ((time - metadata.lastRefreshMs) > cacheTimeout)
})
val topicsToRefresh = missingTopics ++ topicsWithBadOrExpiredMetadata
@@ -67,4 +67,12 @@ object TopicMetadataCache extends Logging {
def clear {
topicMetadataMap.clear
}
+
+ /**
+ * Helper method to check if a topic's metadata has a bad errorCode, or if a
+ * partition's metadata has a bad errorCode.
+ */
+ def hasBadErrorCode(streamMetadata: TopicMetadata) = {
+ KafkaUtil.isBadErrorCode(streamMetadata.errorCode) || streamMetadata.partitionsMetadata.exists(partitionMetadata => KafkaUtil.isBadErrorCode(partitionMetadata.errorCode))
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/1202df41/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala b/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala
index 2482f23..a7a095b 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala
@@ -19,15 +19,16 @@
package org.apache.samza.util
-import org.apache.samza.config.{Config, ConfigException}
+import java.util.concurrent.atomic.AtomicLong
+import org.apache.kafka.common.PartitionInfo
+import org.apache.samza.config.Config
+import org.apache.samza.config.ConfigException
import org.apache.samza.config.JobConfig.Config2Job
-import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
import org.apache.samza.system.OutgoingMessageEnvelope
-import org.apache.kafka.common.utils.Utils
-import java.util.Random
-import org.apache.kafka.common.PartitionInfo
+import kafka.common.ErrorMapping
+import kafka.common.ReplicaNotAvailableException
-object KafkaUtil {
+object KafkaUtil extends Logging {
val counter = new AtomicLong(0)
def getClientId(id: String, config: Config): String = getClientId(
@@ -43,10 +44,34 @@ object KafkaUtil {
System.currentTimeMillis,
counter.getAndIncrement)
- private def abs(n: Int) = if(n == Integer.MIN_VALUE) 0 else math.abs(n)
+ private def abs(n: Int) = if (n == Integer.MIN_VALUE) 0 else math.abs(n)
def getIntegerPartitionKey(envelope: OutgoingMessageEnvelope, partitions: java.util.List[PartitionInfo]): Integer = {
val numPartitions = partitions.size
abs(envelope.getPartitionKey.hashCode()) % numPartitions
}
+
+ /**
+ * Exactly the same as Kafka's ErrorMapping.maybeThrowException
+ * implementation, except suppresses ReplicaNotAvailableException exceptions.
+ * According to the Kafka
+ * <a href="https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol">protocol
+ * docs</a>, ReplicaNotAvailableException can be safely ignored.
+ */
+ def maybeThrowException(code: Short) {
+ try {
+ ErrorMapping.maybeThrowException(code)
+ } catch {
+ case e: ReplicaNotAvailableException =>
+ debug("Got ReplicaNotAvailableException, but ignoring since it's safe to do so.")
+ }
+ }
+
+ /**
+ * Checks if a Kafka errorCode is "bad" or not. "Bad" is defined as any
+ * errorCode that's not NoError and also not ReplicaNotAvailableCode.
+ */
+ def isBadErrorCode(code: Short) = {
+ code != ErrorMapping.NoError && code != ErrorMapping.ReplicaNotAvailableCode
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/1202df41/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
index 3d1e6ec..0380d35 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
@@ -22,9 +22,7 @@
package org.apache.samza.system.kafka
import java.util.Properties
-
import kafka.admin.AdminUtils
-import kafka.common.ErrorMapping
import kafka.consumer.Consumer
import kafka.consumer.ConsumerConfig
import kafka.server.KafkaConfig
@@ -34,7 +32,6 @@ import kafka.utils.TestZKUtils
import kafka.utils.Utils
import kafka.utils.ZKStringSerializer
import kafka.zk.EmbeddedZookeeper
-
import org.I0Itec.zkclient.ZkClient
import org.apache.samza.Partition
import org.apache.samza.system.SystemStreamMetadata
@@ -45,11 +42,12 @@ import org.apache.samza.util.ClientUtilTopicMetadataStore
import org.apache.samza.util.TopicMetadataStore
import org.junit.Assert._
import org.junit.{Test, BeforeClass, AfterClass}
-
import scala.collection.JavaConversions._
import org.apache.samza.config.KafkaProducerConfig
import org.apache.kafka.clients.producer.{ProducerRecord, KafkaProducer}
import java.util
+import kafka.common.ErrorMapping
+import org.apache.samza.util.KafkaUtil
object TestKafkaSystemAdmin {
val TOPIC = "input"
@@ -113,7 +111,7 @@ object TestKafkaSystemAdmin {
val topicMetadata = topicMetadataMap(TOPIC)
val errorCode = topicMetadata.errorCode
- ErrorMapping.maybeThrowException(errorCode)
+ KafkaUtil.maybeThrowException(errorCode)
done = expectedPartitionCount == topicMetadata.partitionsMetadata.size
} catch {
http://git-wip-us.apache.org/repos/asf/samza/blob/1202df41/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestTopicMetadataCache.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestTopicMetadataCache.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestTopicMetadataCache.scala
index e698d2f..50c13ab 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestTopicMetadataCache.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestTopicMetadataCache.scala
@@ -21,15 +21,15 @@ package org.apache.samza.system.kafka
import java.util.concurrent.CountDownLatch
import java.util.concurrent.atomic.{ AtomicBoolean, AtomicInteger }
-
import kafka.api.TopicMetadata
-
import org.I0Itec.zkclient.ZkClient
import org.apache.samza.util.Clock
import org.apache.samza.util.TopicMetadataStore
import org.junit.Assert._
import org.junit.Before
import org.junit.Test
+import kafka.common.ErrorMapping
+import kafka.api.PartitionMetadata
class TestTopicMetadataCache {
@@ -124,4 +124,14 @@ class TestTopicMetadataCache {
assertTrue(numAssertionSuccess.get())
assertEquals(1, mockStore.numberOfCalls.get())
}
+
+ @Test
+ def testBadErrorCodes {
+ val partitionMetadataBad = new PartitionMetadata(0, None, Seq(), errorCode = ErrorMapping.LeaderNotAvailableCode)
+ val partitionMetadataGood = new PartitionMetadata(0, None, Seq(), errorCode = ErrorMapping.NoError)
+ assertTrue(TopicMetadataCache.hasBadErrorCode(new TopicMetadata("test", List.empty, ErrorMapping.RequestTimedOutCode)))
+ assertTrue(TopicMetadataCache.hasBadErrorCode(new TopicMetadata("test", List(partitionMetadataBad), ErrorMapping.NoError)))
+ assertFalse(TopicMetadataCache.hasBadErrorCode(new TopicMetadata("test", List.empty, ErrorMapping.NoError)))
+ assertFalse(TopicMetadataCache.hasBadErrorCode(new TopicMetadata("test", List(partitionMetadataGood), ErrorMapping.NoError)))
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/1202df41/samza-kafka/src/test/scala/org/apache/samza/utils/TestKafkaUtil.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/utils/TestKafkaUtil.scala b/samza-kafka/src/test/scala/org/apache/samza/utils/TestKafkaUtil.scala
new file mode 100644
index 0000000..848cfc8
--- /dev/null
+++ b/samza-kafka/src/test/scala/org/apache/samza/utils/TestKafkaUtil.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.utils
+
+import org.junit.Test
+import org.scalatest.{ Matchers => ScalaTestMatchers }
+import org.apache.samza.util.KafkaUtil
+import kafka.common.ErrorMapping
+
+class TestKafkaUtil extends ScalaTestMatchers {
+ @Test
+ def testMaybeThrowException {
+ intercept[Exception] { KafkaUtil.maybeThrowException(ErrorMapping.UnknownTopicOrPartitionCode) }
+ KafkaUtil.maybeThrowException(ErrorMapping.ReplicaNotAvailableCode)
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/1202df41/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
----------------------------------------------------------------------
diff --git a/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala b/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
index a8b724b..d66b3bd 100644
--- a/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
+++ b/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
@@ -22,9 +22,7 @@ package org.apache.samza.test.integration
import java.util.Properties
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
-
import kafka.admin.AdminUtils
-import kafka.common.ErrorMapping
import kafka.consumer.Consumer
import kafka.consumer.ConsumerConfig
import kafka.message.MessageAndMetadata
@@ -35,7 +33,6 @@ import kafka.utils.TestZKUtils
import kafka.utils.Utils
import kafka.utils.ZKStringSerializer
import kafka.zk.EmbeddedZookeeper
-
import org.I0Itec.zkclient.ZkClient
import org.apache.samza.Partition
import org.apache.samza.checkpoint.Checkpoint
@@ -59,14 +56,13 @@ import org.apache.samza.util.ClientUtilTopicMetadataStore
import org.apache.samza.util.TopicMetadataStore
import org.junit.Assert._
import org.junit.{BeforeClass, AfterClass, Test}
-
import scala.collection.JavaConversions._
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
import scala.collection.mutable.SynchronizedMap
import org.apache.kafka.clients.producer.{ProducerConfig, Producer, ProducerRecord, KafkaProducer}
import java.util
-
+import org.apache.samza.util.KafkaUtil
object TestStatefulTask {
val INPUT_TOPIC = "input"
@@ -145,7 +141,7 @@ object TestStatefulTask {
val topicMetadata = topicMetadataMap(topic)
val errorCode = topicMetadata.errorCode
- ErrorMapping.maybeThrowException(errorCode)
+ KafkaUtil.maybeThrowException(errorCode)
})
done = true