You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2018/01/10 18:52:46 UTC

samza git commit: SAMZA-1530; Bump up Kafka dependency to 0.11

Repository: samza
Updated Branches:
  refs/heads/master 07f28e948 -> a6540b4e3


SAMZA-1530; Bump up Kafka dependency to 0.11

Author: Dong Lin <li...@gmail.com>

Reviewers: Xinyu Liu <xi...@gmail.com>

Closes #395 from lindong28/SAMZA-1530


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/a6540b4e
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/a6540b4e
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/a6540b4e

Branch: refs/heads/master
Commit: a6540b4e3d1d3916210c50be9b4b2b4920f885bb
Parents: 07f28e9
Author: Dong Lin <li...@gmail.com>
Authored: Wed Jan 10 10:52:38 2018 -0800
Committer: xiliu <xi...@xiliu-ld1.linkedin.biz>
Committed: Wed Jan 10 10:52:38 2018 -0800

----------------------------------------------------------------------
 bin/check-all.sh                                |  2 +-
 gradle/dependency-versions.gradle               |  2 +-
 .../apache/samza/system/kafka/BrokerProxy.scala | 13 ++++----
 .../apache/samza/system/kafka/GetOffset.scala   |  4 +--
 .../samza/system/kafka/KafkaSystemAdmin.scala   |  8 ++---
 .../samza/system/kafka/TopicMetadataCache.scala |  2 +-
 .../scala/org/apache/samza/util/KafkaUtil.scala |  8 +++--
 .../samza/system/kafka/MockKafkaProducer.java   | 25 +++++++++++++--
 .../kafka/TestKafkaCheckpointManager.scala      |  7 ++---
 .../samza/system/kafka/TestBrokerProxy.scala    |  9 +++---
 .../system/kafka/TestKafkaSystemAdmin.scala     | 26 ++++++----------
 .../system/kafka/TestKafkaSystemConsumer.scala  |  4 +--
 .../system/kafka/TestTopicMetadataCache.scala   | 32 +++++++++++---------
 .../org/apache/samza/utils/TestKafkaUtil.scala  |  7 +++--
 .../processor/TestZkLocalApplicationRunner.java |  2 +-
 .../test/integration/StreamTaskTestUtil.scala   |  7 ++---
 16 files changed, 86 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/a6540b4e/bin/check-all.sh
----------------------------------------------------------------------
diff --git a/bin/check-all.sh b/bin/check-all.sh
index 2f9f03c..f168bc8 100755
--- a/bin/check-all.sh
+++ b/bin/check-all.sh
@@ -21,7 +21,7 @@
 
 set -e
 
-SCALAs=( "2.10" "2.11" "2.12" )
+SCALAs=( "2.11" "2.12" )
 JDKs=( "JAVA8_HOME" )
 YARNs=( "2.6.1" "2.7.1" )
 

http://git-wip-us.apache.org/repos/asf/samza/blob/a6540b4e/gradle/dependency-versions.gradle
----------------------------------------------------------------------
diff --git a/gradle/dependency-versions.gradle b/gradle/dependency-versions.gradle
index 20a1d56..2e45914 100644
--- a/gradle/dependency-versions.gradle
+++ b/gradle/dependency-versions.gradle
@@ -33,7 +33,7 @@
   jodaTimeVersion = "2.2"
   joptSimpleVersion = "3.2"
   junitVersion = "4.8.1"
-  kafkaVersion = "0.10.1.1"
+  kafkaVersion = "0.11.0.2"
   log4jVersion = "1.2.17"
   metricsVersion = "2.2.0"
   mockitoVersion = "1.10.19"

http://git-wip-us.apache.org/repos/asf/samza/blob/a6540b4e/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
index 5338886..8a6618d 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
@@ -182,7 +182,7 @@ class BrokerProxy(
       firstCallBarrier.countDown()
 
       // Split response into errors and non errors, processing the errors first
-      val (nonErrorResponses, errorResponses) = response.data.toSet.partition(_._2.error == ErrorMapping.NoError)
+      val (nonErrorResponses, errorResponses) = response.data.toSet.partition(_._2.error.code() == ErrorMapping.NoError)
 
       handleErrors(errorResponses, response)
 
@@ -219,18 +219,17 @@ class BrokerProxy(
     immutableNextOffsetsCopy.keySet.foreach(abdicate(_))
   }
 
-  def handleErrors(errorResponses: Set[(TopicAndPartition, FetchResponsePartitionData)], response:FetchResponse) = {
+  def handleErrors(errorResponses: Set[(TopicAndPartition, FetchResponsePartitionData)], response: FetchResponse) = {
     // FetchResponse should really return Option and a list of the errors so we don't have to find them ourselves
-    case class Error(tp: TopicAndPartition, code: Short, exception: Throwable)
+    case class Error(tp: TopicAndPartition, code: Short, exception: Exception)
 
     // Now subdivide the errors into three types: non-recoverable, not leader (== abdicate) and offset out of range (== get new offset)
 
     // Convert FetchResponse into easier-to-work-with Errors
     val errors = for (
       (topicAndPartition, responseData) <- errorResponses;
-      errorCode <- Option(response.errorCode(topicAndPartition.topic, topicAndPartition.partition)); // Scala's being cranky about referring to error.getKey values...
-      exception <- Option(ErrorMapping.exceptionFor(errorCode))
-    ) yield new Error(topicAndPartition, errorCode, exception)
+      error <- Option(response.error(topicAndPartition.topic, topicAndPartition.partition)) // Scala's being cranky about referring to error.getKey values...
+    ) yield new Error(topicAndPartition, error.code(), error.exception())
 
     val (notLeaderOrUnknownTopic, otherErrors) = errors.partition { case (e) => e.code == ErrorMapping.NotLeaderForPartitionCode || e.code == ErrorMapping.UnknownTopicOrPartitionCode }
     val (offsetOutOfRangeErrors, remainingErrors) = otherErrors.partition(_.code == ErrorMapping.OffsetOutOfRangeCode)
@@ -241,7 +240,7 @@ class BrokerProxy(
     // handle the recoverable errors.
     remainingErrors.foreach(e => {
       warn("Got non-recoverable error codes during multifetch. Throwing an exception to trigger reconnect. Errors: %s" format remainingErrors.mkString(","))
-      KafkaUtil.maybeThrowException(e.code) })
+      KafkaUtil.maybeThrowException(e.exception) })
 
     notLeaderOrUnknownTopic.foreach(e => abdicate(e.tp))
 

http://git-wip-us.apache.org/repos/asf/samza/blob/a6540b4e/samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala
index 5528702..040e246 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala
@@ -60,7 +60,7 @@ class GetOffset(
       val messages = consumer.defaultFetch((topicAndPartition, offset.toLong))
 
       if (messages.hasError) {
-        KafkaUtil.maybeThrowException(messages.errorCode(topicAndPartition.topic, topicAndPartition.partition))
+        KafkaUtil.maybeThrowException(messages.error(topicAndPartition.topic, topicAndPartition.partition).exception())
       }
 
       info("Able to successfully read from offset %s for topic and partition %s. Using it to instantiate consumer." format (offset, topicAndPartition))
@@ -86,7 +86,7 @@ class GetOffset(
       .get(topicAndPartition)
       .getOrElse(toss("Unable to find offset information for %s" format topicAndPartition))
 
-    KafkaUtil.maybeThrowException(partitionOffsetResponse.error)
+    KafkaUtil.maybeThrowException(partitionOffsetResponse.error.exception())
 
     partitionOffsetResponse
       .offsets

http://git-wip-us.apache.org/repos/asf/samza/blob/a6540b4e/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
index 013b292..4715141 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
@@ -164,7 +164,7 @@ class KafkaSystemAdmin(
           metadataTTL)
         val result = metadata.map {
           case (topic, topicMetadata) => {
-            KafkaUtil.maybeThrowException(topicMetadata.errorCode)
+            KafkaUtil.maybeThrowException(topicMetadata.error.exception())
             val partitionsMap = topicMetadata.partitionsMetadata.map {
               pm =>
                 new Partition(pm.partitionId) -> new SystemStreamPartitionMetadata("", "", "")
@@ -350,7 +350,7 @@ class KafkaSystemAdmin(
       .values
       // Convert the topic metadata to a Seq[(Broker, TopicAndPartition)]
       .flatMap(topicMetadata => {
-        KafkaUtil.maybeThrowException(topicMetadata.errorCode)
+        KafkaUtil.maybeThrowException(topicMetadata.error.exception())
         topicMetadata
           .partitionsMetadata
           // Convert Seq[PartitionMetadata] to Seq[(Broker, TopicAndPartition)]
@@ -390,7 +390,7 @@ class KafkaSystemAdmin(
       .getOffsetsBefore(new OffsetRequest(partitionOffsetInfo))
       .partitionErrorAndOffsets
       .mapValues(partitionErrorAndOffset => {
-        KafkaUtil.maybeThrowException(partitionErrorAndOffset.error)
+        KafkaUtil.maybeThrowException(partitionErrorAndOffset.error.exception())
         partitionErrorAndOffset.offsets.head
       })
 
@@ -480,7 +480,7 @@ class KafkaSystemAdmin(
         val metadataStore = new ClientUtilTopicMetadataStore(brokerListString, clientId, timeout)
         val topicMetadataMap = TopicMetadataCache.getTopicMetadata(Set(topicName), systemName, metadataStore.getTopicInfo, metadataTTL)
         val topicMetadata = topicMetadataMap(topicName)
-        KafkaUtil.maybeThrowException(topicMetadata.errorCode)
+        KafkaUtil.maybeThrowException(topicMetadata.error.exception())
 
         val partitionCount = topicMetadata.partitionsMetadata.length
         if (partitionCount != spec.getPartitionCount) {

http://git-wip-us.apache.org/repos/asf/samza/blob/a6540b4e/samza-kafka/src/main/scala/org/apache/samza/system/kafka/TopicMetadataCache.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/TopicMetadataCache.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/TopicMetadataCache.scala
index 82ecf1a..8a3ab2b 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/TopicMetadataCache.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/TopicMetadataCache.scala
@@ -73,6 +73,6 @@ object TopicMetadataCache extends Logging {
    * partition's metadata has a bad errorCode.
    */
   def hasBadErrorCode(streamMetadata: TopicMetadata) = {
-    KafkaUtil.isBadErrorCode(streamMetadata.errorCode) || streamMetadata.partitionsMetadata.exists(partitionMetadata => KafkaUtil.isBadErrorCode(partitionMetadata.errorCode))
+    KafkaUtil.isBadErrorCode(streamMetadata.error.code()) || streamMetadata.partitionsMetadata.exists(partitionMetadata => KafkaUtil.isBadErrorCode(partitionMetadata.error.code()))
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/a6540b4e/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala b/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala
index 1410cbb..5b0137a 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala
@@ -29,7 +29,8 @@ import org.apache.samza.config.{ApplicationConfig, Config, ConfigException}
 import org.apache.samza.config.JobConfig.Config2Job
 import org.apache.samza.execution.StreamManager
 import org.apache.samza.system.OutgoingMessageEnvelope
-import kafka.common.{ErrorMapping, ReplicaNotAvailableException}
+import org.apache.kafka.common.errors.ReplicaNotAvailableException
+import kafka.common.ErrorMapping
 import org.apache.kafka.common.errors.TopicExistsException
 import org.apache.samza.system.kafka.TopicMetadataCache
 
@@ -71,9 +72,10 @@ object KafkaUtil extends Logging {
    * <a href="https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol">protocol
    * docs</a>, ReplicaNotAvailableException can be safely ignored.
    */
-  def maybeThrowException(code: Short) {
+  def maybeThrowException(e: Exception) {
     try {
-      ErrorMapping.maybeThrowException(code)
+      if (e != null)
+        throw e
     } catch {
       case e: ReplicaNotAvailableException =>
         debug("Got ReplicaNotAvailableException, but ignoring since it's safe to do so.")

http://git-wip-us.apache.org/repos/asf/samza/blob/a6540b4e/samza-kafka/src/test/java/org/apache/samza/system/kafka/MockKafkaProducer.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/java/org/apache/samza/system/kafka/MockKafkaProducer.java b/samza-kafka/src/test/java/org/apache/samza/system/kafka/MockKafkaProducer.java
index 024c6e6..e66b7c3 100644
--- a/samza-kafka/src/test/java/org/apache/samza/system/kafka/MockKafkaProducer.java
+++ b/samza-kafka/src/test/java/org/apache/samza/system/kafka/MockKafkaProducer.java
@@ -32,6 +32,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReferenceArray;
 import kafka.producer.ProducerClosedException;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
@@ -42,6 +43,7 @@ import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.test.TestUtils;
 
 public class MockKafkaProducer implements Producer<byte[], byte[]> {
@@ -98,7 +100,7 @@ public class MockKafkaProducer implements Producer<byte[], byte[]> {
   }
 
   private RecordMetadata getRecordMetadata(ProducerRecord record) {
-    return new RecordMetadata(new TopicPartition(record.topic(), record.partition() == null ? 0 : record.partition()), 0, this.msgsSent.get(), Record.NO_TIMESTAMP, -1, -1, -1);
+    return new RecordMetadata(new TopicPartition(record.topic(), record.partition() == null ? 0 : record.partition()), 0, this.msgsSent.get(), -1L, -1, -1, -1);
   }
 
   @Override
@@ -190,6 +192,25 @@ public class MockKafkaProducer implements Producer<byte[], byte[]> {
     new FlushRunnable(0).run();
   }
 
+  public void initTransactions() {
+
+  }
+
+  public void abortTransaction() {
+
+  }
+
+  public void beginTransaction() {
+
+  }
+
+  public void commitTransaction() {
+
+  }
+
+  public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) {
+    
+  }
 
   private static class FutureFailure implements Future<RecordMetadata> {
 
@@ -232,7 +253,7 @@ public class MockKafkaProducer implements Producer<byte[], byte[]> {
 
     public FutureSuccess(ProducerRecord record, int offset) {
       this.record = record;
-      this._metadata = new RecordMetadata(new TopicPartition(record.topic(), record.partition() == null ? 0 : record.partition()), 0, offset, Record.NO_TIMESTAMP, -1, -1, -1);
+      this._metadata = new RecordMetadata(new TopicPartition(record.topic(), record.partition() == null ? 0 : record.partition()), 0, offset, RecordBatch.NO_TIMESTAMP, -1, -1, -1);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/samza/blob/a6540b4e/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
index ec9f3a0..86cb418 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
@@ -52,15 +52,12 @@ class TestKafkaCheckpointManager extends KafkaServerTestHarness {
   val checkpoint1 = new Checkpoint(ImmutableMap.of(ssp, "offset-1"))
   val checkpoint2 = new Checkpoint(ImmutableMap.of(ssp, "offset-2"))
   val taskName = new TaskName("Partition 0")
-
-  var brokers: String = null
   var config: Config = null
 
   @Before
   override def setUp {
     super.setUp
     TestUtils.waitUntilTrue(() => servers.head.metadataCache.getAliveBrokers.size == numBrokers, "Wait for cache to update")
-    brokers = brokerList.split(",").map(p => "localhost" + p).mkString(",")
     config = getConfig()
   }
 
@@ -140,7 +137,7 @@ class TestKafkaCheckpointManager extends KafkaServerTestHarness {
     val defaultSerializer = classOf[ByteArraySerializer].getCanonicalName
     val props = new Properties()
     props.putAll(ImmutableMap.of(
-      ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers,
+      ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList,
       ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, defaultSerializer,
       ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, defaultSerializer))
     props
@@ -151,7 +148,7 @@ class TestKafkaCheckpointManager extends KafkaServerTestHarness {
       .put(JobConfig.JOB_NAME, "some-job-name")
       .put(JobConfig.JOB_ID, "i001")
       .put(s"systems.$checkpointSystemName.samza.factory", classOf[KafkaSystemFactory].getCanonicalName)
-      .put(s"systems.$checkpointSystemName.producer.bootstrap.servers", brokers)
+      .put(s"systems.$checkpointSystemName.producer.bootstrap.servers", brokerList)
       .put(s"systems.$checkpointSystemName.consumer.zookeeper.connect", zkConnect)
       .put("task.checkpoint.system", checkpointSystemName)
       .build())

http://git-wip-us.apache.org/repos/asf/samza/blob/a6540b4e/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
index f0bdafd..d510076 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
@@ -24,9 +24,10 @@ import java.nio.ByteBuffer
 import java.util.concurrent.CountDownLatch
 
 import kafka.api.{PartitionOffsetsResponse, _}
-import kafka.common.{ErrorMapping, TopicAndPartition}
+import kafka.common.TopicAndPartition
 import kafka.consumer.SimpleConsumer
 import kafka.message.{ByteBufferMessageSet, Message, MessageAndOffset, MessageSet}
+import org.apache.kafka.common.protocol.Errors
 import org.apache.samza.SamzaException
 import org.apache.samza.util.Logging
 import org.junit.Assert._
@@ -165,7 +166,7 @@ class TestBrokerProxy extends Logging {
               messageSet
             }
 
-            val fetchResponsePartitionData = FetchResponsePartitionData(0, 500, messageSet)
+            val fetchResponsePartitionData = FetchResponsePartitionData(Errors.NONE, 500, messageSet)
             val map = scala.Predef.Map[TopicAndPartition, FetchResponsePartitionData](tp -> fetchResponsePartitionData)
 
             when(fetchResponse.data).thenReturn(map.toSeq)
@@ -257,12 +258,12 @@ class TestBrokerProxy extends Logging {
           }
           val mfr = mock(classOf[FetchResponse])
           when(mfr.hasError).thenReturn(true)
-          when(mfr.errorCode("topic", 42)).thenReturn(ErrorMapping.OffsetOutOfRangeCode)
+          when(mfr.error("topic", 42)).thenReturn(Errors.OFFSET_OUT_OF_RANGE)
 
           val messageSet = mock(classOf[MessageSet])
           when(messageSet.iterator).thenReturn(Iterator.empty)
           val response = mock(classOf[FetchResponsePartitionData])
-          when(response.error).thenReturn(ErrorMapping.OffsetOutOfRangeCode)
+          when(response.error).thenReturn(Errors.OFFSET_OUT_OF_RANGE)
           val responseMap = Map(tp -> response)
           when(mfr.data).thenReturn(responseMap.toSeq)
           invocationCount += 1

http://git-wip-us.apache.org/repos/asf/samza/blob/a6540b4e/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
index 65c43f5..2039447 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
@@ -24,7 +24,8 @@ package org.apache.samza.system.kafka
 import java.util.{Properties, UUID}
 
 import kafka.admin.AdminUtils
-import kafka.common.{ErrorMapping, LeaderNotAvailableException}
+import org.apache.kafka.common.errors.LeaderNotAvailableException
+import org.apache.kafka.common.protocol.Errors
 import kafka.consumer.{Consumer, ConsumerConfig}
 import kafka.integration.KafkaServerTestHarness
 import kafka.server.KafkaConfig
@@ -68,19 +69,13 @@ object TestKafkaSystemAdmin extends KafkaServerTestHarness {
   @BeforeClass
   override def setUp {
     super.setUp
-
     val config = new java.util.HashMap[String, String]()
-
-    brokers = brokerList.split(",").map(p => "localhost" + p).mkString(",")
-
-    config.put("bootstrap.servers", brokers)
+    config.put("bootstrap.servers", brokerList)
     config.put("acks", "all")
     config.put("serializer.class", "kafka.serializer.StringEncoder")
-
     producerConfig = new KafkaProducerConfig("kafka", "i001", config)
-
     producer = new KafkaProducer[Array[Byte], Array[Byte]](producerConfig.getProducerProperties)
-    metadataStore = new ClientUtilTopicMetadataStore(brokers, "some-job-name")
+    metadataStore = new ClientUtilTopicMetadataStore(brokerList, "some-job-name")
   }
 
 
@@ -107,9 +102,8 @@ object TestKafkaSystemAdmin extends KafkaServerTestHarness {
       try {
         val topicMetadataMap = TopicMetadataCache.getTopicMetadata(Set(topic), SYSTEM, metadataStore.getTopicInfo)
         val topicMetadata = topicMetadataMap(topic)
-        val errorCode = topicMetadata.errorCode
 
-        KafkaUtil.maybeThrowException(errorCode)
+        KafkaUtil.maybeThrowException(topicMetadata.error.exception())
 
         done = expectedPartitionCount == topicMetadata.partitionsMetadata.size
       } catch {
@@ -137,11 +131,11 @@ object TestKafkaSystemAdmin extends KafkaServerTestHarness {
   }
 
   def createSystemAdmin: KafkaSystemAdmin = {
-    new KafkaSystemAdmin(SYSTEM, brokers, connectZk = () => ZkUtils(zkConnect, 6000, 6000, zkSecure))
+    new KafkaSystemAdmin(SYSTEM, brokerList, connectZk = () => ZkUtils(zkConnect, 6000, 6000, zkSecure))
   }
 
   def createSystemAdmin(coordinatorStreamProperties: Properties, coordinatorStreamReplicationFactor: Int, topicMetaInformation: Map[String, ChangelogInfo]): KafkaSystemAdmin = {
-    new KafkaSystemAdmin(SYSTEM, brokers, connectZk = () => ZkUtils(zkConnect, 6000, 6000, zkSecure), coordinatorStreamProperties, coordinatorStreamReplicationFactor, 10000, ConsumerConfig.SocketBufferSize, UUID.randomUUID.toString, topicMetaInformation, Map())
+    new KafkaSystemAdmin(SYSTEM, brokerList, connectZk = () => ZkUtils(zkConnect, 6000, 6000, zkSecure), coordinatorStreamProperties, coordinatorStreamReplicationFactor, 10000, ConsumerConfig.SocketBufferSize, UUID.randomUUID.toString, topicMetaInformation, Map())
   }
 
 }
@@ -281,7 +275,7 @@ class TestKafkaSystemAdmin {
   @Test
   def testShouldCreateCoordinatorStream {
     val topic = "test-coordinator-stream"
-    val systemAdmin = new KafkaSystemAdmin(SYSTEM, brokers, () => ZkUtils(zkConnect, 6000, 6000, zkSecure), coordinatorStreamReplicationFactor = 3)
+    val systemAdmin = new KafkaSystemAdmin(SYSTEM, brokerList, () => ZkUtils(zkConnect, 6000, 6000, zkSecure), coordinatorStreamReplicationFactor = 3)
 
     val spec = StreamSpec.createCoordinatorStreamSpec(topic, "kafka")
     systemAdmin.createStream(spec)
@@ -294,14 +288,14 @@ class TestKafkaSystemAdmin {
     assertEquals(3, partitionMetadata.replicas.size)
   }
 
-  class KafkaSystemAdminWithTopicMetadataError extends KafkaSystemAdmin(SYSTEM, brokers, () => ZkUtils(zkConnect, 6000, 6000, zkSecure)) {
+  class KafkaSystemAdminWithTopicMetadataError extends KafkaSystemAdmin(SYSTEM, brokerList, () => ZkUtils(zkConnect, 6000, 6000, zkSecure)) {
     import kafka.api.TopicMetadata
     var metadataCallCount = 0
 
     // Simulate Kafka telling us that the leader for the topic is not available
     override def getTopicMetadata(topics: Set[String]) = {
       metadataCallCount += 1
-      val topicMetadata = TopicMetadata(topic = "quux", partitionsMetadata = Seq(), errorCode = ErrorMapping.LeaderNotAvailableCode)
+      val topicMetadata = TopicMetadata(topic = "quux", partitionsMetadata = Seq(), error = Errors.LEADER_NOT_AVAILABLE)
       Map("quux" -> topicMetadata)
     }
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/a6540b4e/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala
index 8a5cbc2..4dd170f 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala
@@ -25,7 +25,7 @@ import kafka.cluster.Broker
 import kafka.common.TopicAndPartition
 import kafka.message.Message
 import kafka.message.MessageAndOffset
-
+import org.apache.kafka.common.protocol.Errors
 import org.apache.samza.system.IncomingMessageEnvelope
 import org.apache.samza.system.SystemStreamPartition
 import org.apache.samza.Partition
@@ -68,7 +68,7 @@ class TestKafkaSystemConsumer {
     // Lie and tell the store that the partition metadata is empty. We can't
     // use partition metadata because it has Broker in its constructor, which
     // is package private to Kafka.
-    val metadataStore = new MockMetadataStore(Map(streamName -> TopicMetadata(streamName, Seq.empty, 0)))
+    val metadataStore = new MockMetadataStore(Map(streamName -> TopicMetadata(streamName, Seq.empty, Errors.NONE)))
     var hosts = List[String]()
     var getHostPortCount = 0
     val consumer = new KafkaSystemConsumer(systemName, systemAdmin, metrics, metadataStore, clientId) {

http://git-wip-us.apache.org/repos/asf/samza/blob/a6540b4e/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestTopicMetadataCache.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestTopicMetadataCache.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestTopicMetadataCache.scala
index 50c13ab..9cc2f63 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestTopicMetadataCache.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestTopicMetadataCache.scala
@@ -20,7 +20,8 @@
 package org.apache.samza.system.kafka
 
 import java.util.concurrent.CountDownLatch
-import java.util.concurrent.atomic.{ AtomicBoolean, AtomicInteger }
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
+
 import kafka.api.TopicMetadata
 import org.I0Itec.zkclient.ZkClient
 import org.apache.samza.util.Clock
@@ -30,6 +31,7 @@ import org.junit.Before
 import org.junit.Test
 import kafka.common.ErrorMapping
 import kafka.api.PartitionMetadata
+import org.apache.kafka.common.protocol.Errors
 
 class TestTopicMetadataCache {
 
@@ -41,8 +43,8 @@ class TestTopicMetadataCache {
 
   class MockTopicMetadataStore extends TopicMetadataStore {
     var mockCache = Map(
-      "topic1" -> new TopicMetadata("topic1", List.empty, 0),
-      "topic2" -> new TopicMetadata("topic2", List.empty, 0))
+      "topic1" -> new TopicMetadata("topic1", List.empty, Errors.NONE),
+      "topic2" -> new TopicMetadata("topic2", List.empty, Errors.NONE))
     var numberOfCalls: AtomicInteger = new AtomicInteger(0)
 
     def getTopicInfo(topics: Set[String]) = {
@@ -53,7 +55,7 @@ class TestTopicMetadataCache {
     }
 
     def setErrorCode(topic: String, errorCode: Short) {
-      mockCache += topic -> new TopicMetadata(topic, List.empty, errorCode)
+      mockCache += topic -> new TopicMetadata(topic, List.empty, Errors.forCode(errorCode))
     }
   }
 
@@ -70,7 +72,7 @@ class TestTopicMetadataCache {
     mockStore.setErrorCode("topic1", 3)
     var metadata = TopicMetadataCache.getTopicMetadata(Set("topic1"), "kafka", mockStore.getTopicInfo, 5, mockTime.currentTimeMillis)
     assertEquals("topic1", metadata("topic1").topic)
-    assertEquals(3, metadata("topic1").errorCode)
+    assertEquals(3, metadata("topic1").error.code)
     assertEquals(1, mockStore.numberOfCalls.get())
 
     // Retrieve the same topic from the cache which has an error code. Ensure the store is called to refresh the cache
@@ -78,21 +80,21 @@ class TestTopicMetadataCache {
     mockStore.setErrorCode("topic1", 0)
     metadata = TopicMetadataCache.getTopicMetadata(Set("topic1"), "kafka", mockStore.getTopicInfo, 5, mockTime.currentTimeMillis)
     assertEquals("topic1", metadata("topic1").topic)
-    assertEquals(0, metadata("topic1").errorCode)
+    assertEquals(0, metadata("topic1").error.code)
     assertEquals(2, mockStore.numberOfCalls.get())
 
     // Retrieve the same topic from the cache with refresh rate greater than the last update. Ensure the store is not
     // called
     metadata = TopicMetadataCache.getTopicMetadata(Set("topic1"), "kafka", mockStore.getTopicInfo, 5, mockTime.currentTimeMillis)
     assertEquals("topic1", metadata("topic1").topic)
-    assertEquals(0, metadata("topic1").errorCode)
+    assertEquals(0, metadata("topic1").error.code)
     assertEquals(2, mockStore.numberOfCalls.get())
 
     // Ensure that refresh happens when refresh rate is less than the last update. Ensure the store is called
     mockTime.currentValue = 11
     metadata = TopicMetadataCache.getTopicMetadata(Set("topic1"), "kafka", mockStore.getTopicInfo, 5, mockTime.currentTimeMillis)
     assertEquals("topic1", metadata("topic1").topic)
-    assertEquals(0, metadata("topic1").errorCode)
+    assertEquals(0, metadata("topic1").error.code)
     assertEquals(3, mockStore.numberOfCalls.get())
   }
 
@@ -113,7 +115,7 @@ class TestTopicMetadataCache {
           waitForThreadStart.await()
           val metadata = TopicMetadataCache.getTopicMetadata(Set("topic1"), "kafka", mockStore.getTopicInfo, 5, mockTime.currentTimeMillis)
           numAssertionSuccess.compareAndSet(true, metadata("topic1").topic.equals("topic1"))
-          numAssertionSuccess.compareAndSet(true, metadata("topic1").errorCode == 0)
+          numAssertionSuccess.compareAndSet(true, metadata("topic1").error.code == 0)
         }
       })
       threads(i).start()
@@ -127,11 +129,11 @@ class TestTopicMetadataCache {
 
   @Test
   def testBadErrorCodes {
-    val partitionMetadataBad = new PartitionMetadata(0, None, Seq(), errorCode = ErrorMapping.LeaderNotAvailableCode)
-    val partitionMetadataGood = new PartitionMetadata(0, None, Seq(), errorCode = ErrorMapping.NoError)
-    assertTrue(TopicMetadataCache.hasBadErrorCode(new TopicMetadata("test", List.empty, ErrorMapping.RequestTimedOutCode)))
-    assertTrue(TopicMetadataCache.hasBadErrorCode(new TopicMetadata("test", List(partitionMetadataBad), ErrorMapping.NoError)))
-    assertFalse(TopicMetadataCache.hasBadErrorCode(new TopicMetadata("test", List.empty, ErrorMapping.NoError)))
-    assertFalse(TopicMetadataCache.hasBadErrorCode(new TopicMetadata("test", List(partitionMetadataGood), ErrorMapping.NoError)))
+    val partitionMetadataBad = new PartitionMetadata(0, None, Seq(), error = Errors.LEADER_NOT_AVAILABLE)
+    val partitionMetadataGood = new PartitionMetadata(0, None, Seq(), error = Errors.NONE)
+    assertTrue(TopicMetadataCache.hasBadErrorCode(new TopicMetadata("test", List.empty, Errors.REQUEST_TIMED_OUT)))
+    assertTrue(TopicMetadataCache.hasBadErrorCode(new TopicMetadata("test", List(partitionMetadataBad), Errors.NONE)))
+    assertFalse(TopicMetadataCache.hasBadErrorCode(new TopicMetadata("test", List.empty, Errors.NONE)))
+    assertFalse(TopicMetadataCache.hasBadErrorCode(new TopicMetadata("test", List(partitionMetadataGood), Errors.NONE)))
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/a6540b4e/samza-kafka/src/test/scala/org/apache/samza/utils/TestKafkaUtil.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/utils/TestKafkaUtil.scala b/samza-kafka/src/test/scala/org/apache/samza/utils/TestKafkaUtil.scala
index 848cfc8..3548aea 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/utils/TestKafkaUtil.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/utils/TestKafkaUtil.scala
@@ -20,14 +20,15 @@
 package org.apache.samza.utils
 
 import org.junit.Test
-import org.scalatest.{ Matchers => ScalaTestMatchers }
+import org.scalatest.{Matchers => ScalaTestMatchers}
 import org.apache.samza.util.KafkaUtil
 import kafka.common.ErrorMapping
+import org.apache.kafka.common.protocol.Errors
 
 class TestKafkaUtil extends ScalaTestMatchers {
   @Test
   def testMaybeThrowException {
-    intercept[Exception] { KafkaUtil.maybeThrowException(ErrorMapping.UnknownTopicOrPartitionCode) }
-    KafkaUtil.maybeThrowException(ErrorMapping.ReplicaNotAvailableCode)
+    intercept[Exception] { KafkaUtil.maybeThrowException(Errors.UNKNOWN_TOPIC_OR_PARTITION.exception()) }
+    KafkaUtil.maybeThrowException(Errors.REPLICA_NOT_AVAILABLE.exception())
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/a6540b4e/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
index 9c5dad5..97fe1f8 100644
--- a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
+++ b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
@@ -541,7 +541,7 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
     // Set up kafka topics.
     publishKafkaEvents(inputKafkaTopic, NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]);
 
-    MapConfig kafkaProducerConfig = new MapConfig(ImmutableMap.of(String.format("systems.%s.producer.%s", TEST_SYSTEM, ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG), "1000"));
+    MapConfig kafkaProducerConfig = new MapConfig(ImmutableMap.of(String.format("systems.%s.producer.%s", TEST_SYSTEM, ProducerConfig.MAX_BLOCK_MS_CONFIG), "1000"));
     MapConfig applicationRunnerConfig1 = new MapConfig(ImmutableList.of(applicationConfig1, kafkaProducerConfig));
     MapConfig applicationRunnerConfig2 = new MapConfig(ImmutableList.of(applicationConfig2, kafkaProducerConfig));
     LocalApplicationRunner applicationRunner1 = new LocalApplicationRunner(applicationRunnerConfig1);

http://git-wip-us.apache.org/repos/asf/samza/blob/a6540b4e/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
----------------------------------------------------------------------
diff --git a/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala b/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
index 4ba51f3..d9261ad 100644
--- a/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
+++ b/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
@@ -117,9 +117,7 @@ object StreamTaskTestUtil {
     })
 
     servers = configs.map(TestUtils.createServer(_)).toBuffer
-
-    val brokerList = TestUtils.getBrokerListStrFromServers(servers, SecurityProtocol.PLAINTEXT)
-    brokers = brokerList.split(",").map(p => "127.0.0.1" + p).mkString(",")
+    brokers = TestUtils.getBrokerListStrFromServers(servers, SecurityProtocol.PLAINTEXT)
 
     // setup the zookeeper and bootstrap servers for local kafka cluster
     jobConfig ++= Map("systems.kafka.consumer.zookeeper.connect" -> zkConnect,
@@ -161,9 +159,8 @@ object StreamTaskTestUtil {
 
         topics.foreach(topic => {
           val topicMetadata = topicMetadataMap(topic)
-          val errorCode = topicMetadata.errorCode
 
-          KafkaUtil.maybeThrowException(errorCode)
+          KafkaUtil.maybeThrowException(topicMetadata.error.exception())
         })
 
         done = true