You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2015/04/08 01:25:43 UTC

[11/36] samza git commit: SAMZA-592; ignore replica not available exceptions, and be more aggressive about refreshing kafka topic metadata

SAMZA-592; ignore replica not available exceptions, and be more aggressive about refreshing kafka topic metadata


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/1202df41
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/1202df41
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/1202df41

Branch: refs/heads/samza-sql
Commit: 1202df4171f44a877997301218fe3c3037abf2d5
Parents: 23c4b39
Author: Chris Riccomini <cr...@apache.org>
Authored: Fri Mar 13 14:52:03 2015 -0700
Committer: Chris Riccomini <cr...@apache.org>
Committed: Fri Mar 13 14:52:03 2015 -0700

----------------------------------------------------------------------
 .../kafka/KafkaCheckpointManager.scala          |  7 ++--
 .../apache/samza/system/kafka/BrokerProxy.scala |  3 +-
 .../apache/samza/system/kafka/GetOffset.scala   |  7 ++--
 .../samza/system/kafka/KafkaSystemAdmin.scala   | 10 ++---
 .../samza/system/kafka/TopicMetadataCache.scala | 12 +++++-
 .../scala/org/apache/samza/util/KafkaUtil.scala | 39 ++++++++++++++++----
 .../system/kafka/TestKafkaSystemAdmin.scala     |  8 ++--
 .../system/kafka/TestTopicMetadataCache.scala   | 14 ++++++-
 .../org/apache/samza/utils/TestKafkaUtil.scala  | 33 +++++++++++++++++
 .../test/integration/TestStatefulTask.scala     |  8 +---
 10 files changed, 107 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/1202df41/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
index 4a1b31f..c9504ec 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
@@ -44,6 +44,7 @@ import org.apache.samza.util.TopicMetadataStore
 import scala.collection.mutable
 import java.util.Properties
 import org.apache.kafka.clients.producer.{Producer, ProducerRecord}
+import org.apache.samza.util.KafkaUtil
 
 /**
  * Kafka checkpoint manager is used to store checkpoints in a Kafka topic.
@@ -159,7 +160,7 @@ class KafkaCheckpointManager(
       .get(topicAndPartition)
       .getOrElse(throw new KafkaCheckpointException("Unable to find offset information for %s:0" format checkpointTopic))
     // Fail or retry if there was an an issue with the offset request.
-    ErrorMapping.maybeThrowException(offsetResponse.error)
+    KafkaUtil.maybeThrowException(offsetResponse.error)
 
     val offset: Long = offsetResponse
       .offsets
@@ -287,7 +288,7 @@ class KafkaCheckpointManager(
                 warn("Got an offset out of range exception while getting last entry in %s for topic %s and partition 0, so returning a null offset to the KafkaConsumer. Let it decide what to do based on its autooffset.reset setting." format (entryType, checkpointTopic))
                 return
               }
-              ErrorMapping.maybeThrowException(errorCode)
+              KafkaUtil.maybeThrowException(errorCode)
             }
 
             for (response <- fetchResponse.messageSet(checkpointTopic, 0)) {
@@ -385,7 +386,7 @@ class KafkaCheckpointManager(
       loop => {
         val topicMetadataMap = TopicMetadataCache.getTopicMetadata(Set(checkpointTopic), systemName, metadataStore.getTopicInfo)
         val topicMetadata = topicMetadataMap(checkpointTopic)
-        ErrorMapping.maybeThrowException(topicMetadata.errorCode)
+        KafkaUtil.maybeThrowException(topicMetadata.errorCode)
 
         val partitionCount = topicMetadata.partitionsMetadata.length
         if (partitionCount != 1) {

http://git-wip-us.apache.org/repos/asf/samza/blob/1202df41/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
index c6e231a..614f33f 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
@@ -34,6 +34,7 @@ import org.apache.samza.util.ThreadNamePrefix.SAMZA_THREAD_NAME_PREFIX
 import scala.collection.JavaConversions._
 import scala.collection.concurrent
 import scala.collection.mutable
+import org.apache.samza.util.KafkaUtil
 
 /**
  *  Companion object for class JvmMetrics encapsulating various constants
@@ -235,7 +236,7 @@ class BrokerProxy(
     // handle the recoverable errors.
     remainingErrors.foreach(e => {
       warn("Got non-recoverable error codes during multifetch. Throwing an exception to trigger reconnect. Errors: %s" format remainingErrors.mkString(","))
-      ErrorMapping.maybeThrowException(e.code) })
+      KafkaUtil.maybeThrowException(e.code) })
 
     notLeaderOrUnknownTopic.foreach(e => abdicate(e.tp))
 

http://git-wip-us.apache.org/repos/asf/samza/blob/1202df41/samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala
index 147aabc..5528702 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala
@@ -21,12 +21,13 @@
 
 package org.apache.samza.system.kafka
 
-import kafka.common.{ OffsetOutOfRangeException, ErrorMapping }
+import kafka.common.OffsetOutOfRangeException
 import kafka.api._
 import kafka.common.TopicAndPartition
 import kafka.api.PartitionOffsetRequestInfo
 import org.apache.samza.util.Logging
 import kafka.message.MessageAndOffset
+import org.apache.samza.util.KafkaUtil
 
 /**
  * GetOffset validates offsets for topic partitions, and manages fetching new
@@ -59,7 +60,7 @@ class GetOffset(
       val messages = consumer.defaultFetch((topicAndPartition, offset.toLong))
 
       if (messages.hasError) {
-        ErrorMapping.maybeThrowException(messages.errorCode(topicAndPartition.topic, topicAndPartition.partition))
+        KafkaUtil.maybeThrowException(messages.errorCode(topicAndPartition.topic, topicAndPartition.partition))
       }
 
       info("Able to successfully read from offset %s for topic and partition %s. Using it to instantiate consumer." format (offset, topicAndPartition))
@@ -85,7 +86,7 @@ class GetOffset(
       .get(topicAndPartition)
       .getOrElse(toss("Unable to find offset information for %s" format topicAndPartition))
 
-    ErrorMapping.maybeThrowException(partitionOffsetResponse.error)
+    KafkaUtil.maybeThrowException(partitionOffsetResponse.error)
 
     partitionOffsetResponse
       .offsets

http://git-wip-us.apache.org/repos/asf/samza/blob/1202df41/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
index b790be1..f783c57 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
@@ -28,12 +28,13 @@ import org.apache.samza.system.SystemStreamPartition
 import org.apache.samza.util.{ClientUtilTopicMetadataStore, ExponentialSleepStrategy, Logging}
 import kafka.api._
 import kafka.consumer.SimpleConsumer
-import kafka.common.{TopicExistsException, TopicAndPartition, ErrorMapping}
+import kafka.common.{TopicExistsException, TopicAndPartition}
 import java.util.{Properties, UUID}
 import scala.collection.JavaConversions._
 import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
 import kafka.consumer.ConsumerConfig
 import kafka.admin.AdminUtils
+import org.apache.samza.util.KafkaUtil
 
 object KafkaSystemAdmin extends Logging {
   /**
@@ -222,12 +223,11 @@ class KafkaSystemAdmin(
       .values
       // Convert the topic metadata to a Seq[(Broker, TopicAndPartition)] 
       .flatMap(topicMetadata => {
-        ErrorMapping.maybeThrowException(topicMetadata.errorCode)
+        KafkaUtil.maybeThrowException(topicMetadata.errorCode)
         topicMetadata
           .partitionsMetadata
           // Convert Seq[PartitionMetadata] to Seq[(Broker, TopicAndPartition)]
           .map(partitionMetadata => {
-            ErrorMapping.maybeThrowException(partitionMetadata.errorCode)
             val topicAndPartition = new TopicAndPartition(topicMetadata.topic, partitionMetadata.partitionId)
             val leader = partitionMetadata
               .leader
@@ -263,7 +263,7 @@ class KafkaSystemAdmin(
       .getOffsetsBefore(new OffsetRequest(partitionOffsetInfo))
       .partitionErrorAndOffsets
       .mapValues(partitionErrorAndOffset => {
-        ErrorMapping.maybeThrowException(partitionErrorAndOffset.error)
+        KafkaUtil.maybeThrowException(partitionErrorAndOffset.error)
         partitionErrorAndOffset.offsets.head
       })
 
@@ -320,7 +320,7 @@ class KafkaSystemAdmin(
         val metadataStore = new ClientUtilTopicMetadataStore(brokerListString, clientId, timeout)
         val topicMetadataMap = TopicMetadataCache.getTopicMetadata(Set(topicName), systemName, metadataStore.getTopicInfo)
         val topicMetadata = topicMetadataMap(topicName)
-        ErrorMapping.maybeThrowException(topicMetadata.errorCode)
+        KafkaUtil.maybeThrowException(topicMetadata.errorCode)
 
         val partitionCount = topicMetadata.partitionsMetadata.length
         if (partitionCount < numKafkaChangelogPartitions) {

http://git-wip-us.apache.org/repos/asf/samza/blob/1202df41/samza-kafka/src/main/scala/org/apache/samza/system/kafka/TopicMetadataCache.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/TopicMetadataCache.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/TopicMetadataCache.scala
index 4a49d22..82ecf1a 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/TopicMetadataCache.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/TopicMetadataCache.scala
@@ -21,7 +21,7 @@ package org.apache.samza.system.kafka
 
 import org.apache.samza.util.Logging
 import kafka.api.TopicMetadata
-import kafka.common.ErrorMapping
+import org.apache.samza.util.KafkaUtil
 
 /**
  * TopicMetadataCache is used to cache all the topic metadata for Kafka per
@@ -43,7 +43,7 @@ object TopicMetadataCache extends Logging {
       val missingTopics = topics.filter(topic => !topicMetadataMap.contains(systemName, topic))
       val topicsWithBadOrExpiredMetadata = (topics -- missingTopics).filter(topic => {
         val metadata = topicMetadataMap(systemName, topic)
-        metadata.streamMetadata.errorCode != ErrorMapping.NoError || ((time - metadata.lastRefreshMs) > cacheTimeout)
+        hasBadErrorCode(metadata.streamMetadata) || ((time - metadata.lastRefreshMs) > cacheTimeout)
       })
       val topicsToRefresh = missingTopics ++ topicsWithBadOrExpiredMetadata
 
@@ -67,4 +67,12 @@ object TopicMetadataCache extends Logging {
   def clear {
     topicMetadataMap.clear
   }
+
+  /**
+   * Helper method to check if a topic's metadata has a bad errorCode, or if a
+   * partition's metadata has a bad errorCode.
+   */
+  def hasBadErrorCode(streamMetadata: TopicMetadata) = {
+    KafkaUtil.isBadErrorCode(streamMetadata.errorCode) || streamMetadata.partitionsMetadata.exists(partitionMetadata => KafkaUtil.isBadErrorCode(partitionMetadata.errorCode))
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/1202df41/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala b/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala
index 2482f23..a7a095b 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala
@@ -19,15 +19,16 @@
 
 package org.apache.samza.util
 
-import org.apache.samza.config.{Config, ConfigException}
+import java.util.concurrent.atomic.AtomicLong
+import org.apache.kafka.common.PartitionInfo
+import org.apache.samza.config.Config
+import org.apache.samza.config.ConfigException
 import org.apache.samza.config.JobConfig.Config2Job
-import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
 import org.apache.samza.system.OutgoingMessageEnvelope
-import org.apache.kafka.common.utils.Utils
-import java.util.Random
-import org.apache.kafka.common.PartitionInfo
+import kafka.common.ErrorMapping
+import kafka.common.ReplicaNotAvailableException
 
-object KafkaUtil {
+object KafkaUtil extends Logging {
   val counter = new AtomicLong(0)
 
   def getClientId(id: String, config: Config): String = getClientId(
@@ -43,10 +44,34 @@ object KafkaUtil {
         System.currentTimeMillis,
         counter.getAndIncrement)
 
-  private def abs(n: Int) = if(n == Integer.MIN_VALUE) 0 else math.abs(n)
+  private def abs(n: Int) = if (n == Integer.MIN_VALUE) 0 else math.abs(n)
 
   def getIntegerPartitionKey(envelope: OutgoingMessageEnvelope, partitions: java.util.List[PartitionInfo]): Integer = {
     val numPartitions = partitions.size
     abs(envelope.getPartitionKey.hashCode()) % numPartitions
   }
+
+  /**
+   * Exactly the same as Kafka's ErrorMapping.maybeThrowException
+   * implementation, except suppresses ReplicaNotAvailableException exceptions.
+   * According to the Kafka
+   * <a href="https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol">protocol
+   * docs</a>, ReplicaNotAvailableException can be safely ignored.
+   */
+  def maybeThrowException(code: Short) {
+    try {
+      ErrorMapping.maybeThrowException(code)
+    } catch {
+      case e: ReplicaNotAvailableException =>
+        debug("Got ReplicaNotAvailableException, but ignoring since it's safe to do so.")
+    }
+  }
+
+  /**
+   * Checks if a Kafka errorCode is "bad" or not. "Bad" is defined as any
+   * errorCode that's not NoError and also not ReplicaNotAvailableCode.
+   */
+  def isBadErrorCode(code: Short) = {
+    code != ErrorMapping.NoError && code != ErrorMapping.ReplicaNotAvailableCode
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/1202df41/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
index 3d1e6ec..0380d35 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
@@ -22,9 +22,7 @@
 package org.apache.samza.system.kafka
 
 import java.util.Properties
-
 import kafka.admin.AdminUtils
-import kafka.common.ErrorMapping
 import kafka.consumer.Consumer
 import kafka.consumer.ConsumerConfig
 import kafka.server.KafkaConfig
@@ -34,7 +32,6 @@ import kafka.utils.TestZKUtils
 import kafka.utils.Utils
 import kafka.utils.ZKStringSerializer
 import kafka.zk.EmbeddedZookeeper
-
 import org.I0Itec.zkclient.ZkClient
 import org.apache.samza.Partition
 import org.apache.samza.system.SystemStreamMetadata
@@ -45,11 +42,12 @@ import org.apache.samza.util.ClientUtilTopicMetadataStore
 import org.apache.samza.util.TopicMetadataStore
 import org.junit.Assert._
 import org.junit.{Test, BeforeClass, AfterClass}
-
 import scala.collection.JavaConversions._
 import org.apache.samza.config.KafkaProducerConfig
 import org.apache.kafka.clients.producer.{ProducerRecord, KafkaProducer}
 import java.util
+import kafka.common.ErrorMapping
+import org.apache.samza.util.KafkaUtil
 
 object TestKafkaSystemAdmin {
   val TOPIC = "input"
@@ -113,7 +111,7 @@ object TestKafkaSystemAdmin {
         val topicMetadata = topicMetadataMap(TOPIC)
         val errorCode = topicMetadata.errorCode
 
-        ErrorMapping.maybeThrowException(errorCode)
+        KafkaUtil.maybeThrowException(errorCode)
 
         done = expectedPartitionCount == topicMetadata.partitionsMetadata.size
       } catch {

http://git-wip-us.apache.org/repos/asf/samza/blob/1202df41/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestTopicMetadataCache.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestTopicMetadataCache.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestTopicMetadataCache.scala
index e698d2f..50c13ab 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestTopicMetadataCache.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestTopicMetadataCache.scala
@@ -21,15 +21,15 @@ package org.apache.samza.system.kafka
 
 import java.util.concurrent.CountDownLatch
 import java.util.concurrent.atomic.{ AtomicBoolean, AtomicInteger }
-
 import kafka.api.TopicMetadata
-
 import org.I0Itec.zkclient.ZkClient
 import org.apache.samza.util.Clock
 import org.apache.samza.util.TopicMetadataStore
 import org.junit.Assert._
 import org.junit.Before
 import org.junit.Test
+import kafka.common.ErrorMapping
+import kafka.api.PartitionMetadata
 
 class TestTopicMetadataCache {
 
@@ -124,4 +124,14 @@ class TestTopicMetadataCache {
     assertTrue(numAssertionSuccess.get())
     assertEquals(1, mockStore.numberOfCalls.get())
   }
+
+  @Test
+  def testBadErrorCodes {
+    val partitionMetadataBad = new PartitionMetadata(0, None, Seq(), errorCode = ErrorMapping.LeaderNotAvailableCode)
+    val partitionMetadataGood = new PartitionMetadata(0, None, Seq(), errorCode = ErrorMapping.NoError)
+    assertTrue(TopicMetadataCache.hasBadErrorCode(new TopicMetadata("test", List.empty, ErrorMapping.RequestTimedOutCode)))
+    assertTrue(TopicMetadataCache.hasBadErrorCode(new TopicMetadata("test", List(partitionMetadataBad), ErrorMapping.NoError)))
+    assertFalse(TopicMetadataCache.hasBadErrorCode(new TopicMetadata("test", List.empty, ErrorMapping.NoError)))
+    assertFalse(TopicMetadataCache.hasBadErrorCode(new TopicMetadata("test", List(partitionMetadataGood), ErrorMapping.NoError)))
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/1202df41/samza-kafka/src/test/scala/org/apache/samza/utils/TestKafkaUtil.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/utils/TestKafkaUtil.scala b/samza-kafka/src/test/scala/org/apache/samza/utils/TestKafkaUtil.scala
new file mode 100644
index 0000000..848cfc8
--- /dev/null
+++ b/samza-kafka/src/test/scala/org/apache/samza/utils/TestKafkaUtil.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.utils
+
+import org.junit.Test
+import org.scalatest.{ Matchers => ScalaTestMatchers }
+import org.apache.samza.util.KafkaUtil
+import kafka.common.ErrorMapping
+
+class TestKafkaUtil extends ScalaTestMatchers {
+  @Test
+  def testMaybeThrowException {
+    intercept[Exception] { KafkaUtil.maybeThrowException(ErrorMapping.UnknownTopicOrPartitionCode) }
+    KafkaUtil.maybeThrowException(ErrorMapping.ReplicaNotAvailableCode)
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/1202df41/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
----------------------------------------------------------------------
diff --git a/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala b/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
index a8b724b..d66b3bd 100644
--- a/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
+++ b/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
@@ -22,9 +22,7 @@ package org.apache.samza.test.integration
 import java.util.Properties
 import java.util.concurrent.CountDownLatch
 import java.util.concurrent.TimeUnit
-
 import kafka.admin.AdminUtils
-import kafka.common.ErrorMapping
 import kafka.consumer.Consumer
 import kafka.consumer.ConsumerConfig
 import kafka.message.MessageAndMetadata
@@ -35,7 +33,6 @@ import kafka.utils.TestZKUtils
 import kafka.utils.Utils
 import kafka.utils.ZKStringSerializer
 import kafka.zk.EmbeddedZookeeper
-
 import org.I0Itec.zkclient.ZkClient
 import org.apache.samza.Partition
 import org.apache.samza.checkpoint.Checkpoint
@@ -59,14 +56,13 @@ import org.apache.samza.util.ClientUtilTopicMetadataStore
 import org.apache.samza.util.TopicMetadataStore
 import org.junit.Assert._
 import org.junit.{BeforeClass, AfterClass, Test}
-
 import scala.collection.JavaConversions._
 import scala.collection.mutable.ArrayBuffer
 import scala.collection.mutable.HashMap
 import scala.collection.mutable.SynchronizedMap
 import org.apache.kafka.clients.producer.{ProducerConfig, Producer, ProducerRecord, KafkaProducer}
 import java.util
-
+import org.apache.samza.util.KafkaUtil
 
 object TestStatefulTask {
   val INPUT_TOPIC = "input"
@@ -145,7 +141,7 @@ object TestStatefulTask {
           val topicMetadata = topicMetadataMap(topic)
           val errorCode = topicMetadata.errorCode
 
-          ErrorMapping.maybeThrowException(errorCode)
+          KafkaUtil.maybeThrowException(errorCode)
         })
 
         done = true