You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2016/12/23 20:23:02 UTC

samza git commit: SAMZA-855; Upgrade Samza's Kafka client version to 0.10.0.1.

Repository: samza
Updated Branches:
  refs/heads/master 6666ced0c -> c65802a98


SAMZA-855; Upgrade Samza's Kafka client version to 0.10.0.1.

This is based out of PR 15, with merge conflicts resolved and version number of zkClient set to 0.8(compatible with Kafka 0.0.10.1 version).

Tested and validated this patch with internal Samza build at LinkedIn. This looks good.

Author: Shanthoosh Venkataraman <sv...@linkedin.com>

Reviewers: Yi Pan <ni...@gmail.com>

Closes #33 from shanthoosh/kafka_10_upgrade


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/c65802a9
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/c65802a9
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/c65802a9

Branch: refs/heads/master
Commit: c65802a98ed8ee7c6481352387e42c58682dc067
Parents: 6666ced
Author: Shanthoosh Venkataraman <sv...@linkedin.com>
Authored: Fri Dec 23 12:22:55 2016 -0800
Committer: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Committed: Fri Dec 23 12:22:55 2016 -0800

----------------------------------------------------------------------
 build.gradle                                    |   3 +
 gradle/dependency-versions.gradle               |   4 +-
 .../kafka/KafkaCheckpointManager.scala          |   7 +-
 .../kafka/KafkaCheckpointManagerFactory.scala   |  11 +-
 .../samza/config/RegExTopicGenerator.scala      |   6 +-
 .../samza/system/kafka/KafkaSystemAdmin.scala   |  11 +-
 .../system/kafka/KafkaSystemConsumer.scala      |  14 +--
 .../samza/system/kafka/KafkaSystemFactory.scala |   9 +-
 .../scala/org/apache/samza/util/KafkaUtil.scala |   4 +-
 .../samza/system/kafka/MockKafkaProducer.java   |  18 ++-
 .../java/org/apache/samza/utils/TestUtils.java  | 112 -------------------
 .../kafka/TestKafkaCheckpointManager.scala      | 101 ++++++++---------
 .../system/kafka/TestKafkaSystemAdmin.scala     |  98 ++++++++--------
 .../system/kafka/TestKafkaSystemConsumer.scala  |  10 +-
 .../system/kafka/TestKafkaSystemProducer.scala  |   8 +-
 .../src/main/python/configs/downloads.json      |   4 +-
 samza-test/src/main/python/configs/kafka.json   |  22 ++--
 .../src/main/python/configs/zookeeper.json      |  10 +-
 .../test/integration/StreamTaskTestUtil.scala   | 103 ++++++++---------
 19 files changed, 220 insertions(+), 335 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/c65802a9/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 0c04931..5b41c52 100644
--- a/build.gradle
+++ b/build.gradle
@@ -257,6 +257,7 @@ project(":samza-kafka_$scalaVersion") {
     compile "org.apache.kafka:kafka_$scalaVersion:$kafkaVersion"
     compile "org.apache.kafka:kafka-clients:$kafkaVersion"
     testCompile "org.apache.kafka:kafka_$scalaVersion:$kafkaVersion:test"
+    testCompile "org.apache.kafka:kafka-clients:$kafkaVersion:test"
     testCompile "junit:junit:$junitVersion"
     testCompile "org.mockito:mockito-all:$mockitoVersion"
     testCompile "org.scalatest:scalatest_$scalaVersion:$scalaTestVersion"
@@ -648,6 +649,8 @@ project(":samza-test_$scalaVersion") {
     testCompile "org.apache.kafka:kafka_$scalaVersion:$kafkaVersion:test"
     testCompile "com.101tec:zkclient:$zkClientVersion"
     testCompile project(":samza-kafka_$scalaVersion")
+    testCompile "org.apache.kafka:kafka_$scalaVersion:$kafkaVersion:test"
+    testCompile "org.apache.kafka:kafka-clients:$kafkaVersion:test"
     testCompile project(":samza-core_$scalaVersion").sourceSets.test.output
     testCompile "org.scalatest:scalatest_$scalaVersion:$scalaTestVersion"
     testRuntime "org.slf4j:slf4j-simple:$slf4jVersion"

http://git-wip-us.apache.org/repos/asf/samza/blob/c65802a9/gradle/dependency-versions.gradle
----------------------------------------------------------------------
diff --git a/gradle/dependency-versions.gradle b/gradle/dependency-versions.gradle
index 1987745..976a49c 100644
--- a/gradle/dependency-versions.gradle
+++ b/gradle/dependency-versions.gradle
@@ -25,10 +25,10 @@
   junitVersion = "4.8.1"
   mockitoVersion = "1.8.4"
   scalaTestVersion = "2.2.4"
-  zkClientVersion = "0.3"
+  zkClientVersion = "0.8"
   zookeeperVersion = "3.3.4"
   metricsVersion = "2.2.0"
-  kafkaVersion = "0.8.2.1"
+  kafkaVersion = "0.10.0.1"
   commonsHttpClientVersion = "3.1"
   rocksdbVersion = "3.13.1"
   yarnVersion = "2.6.1"

http://git-wip-us.apache.org/repos/asf/samza/blob/c65802a9/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
index 8f18a92..6461f9d 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
@@ -27,8 +27,9 @@ import kafka.api._
 import kafka.common.{ErrorMapping, InvalidMessageSizeException, TopicAndPartition, UnknownTopicOrPartitionException}
 import kafka.consumer.SimpleConsumer
 import kafka.message.InvalidMessageException
-import kafka.utils.Utils
-import org.I0Itec.zkclient.ZkClient
+import kafka.utils.ZkUtils
+
+import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.clients.producer.{Producer, ProducerRecord}
 import org.apache.samza.SamzaException
 import org.apache.samza.checkpoint.{Checkpoint, CheckpointManager}
@@ -56,7 +57,7 @@ class KafkaCheckpointManager(
                               fetchSize: Int,
                               val metadataStore: TopicMetadataStore,
                               connectProducer: () => Producer[Array[Byte], Array[Byte]],
-                              val connectZk: () => ZkClient,
+                              val connectZk: () => ZkUtils,
                               systemStreamPartitionGrouperFactoryString: String,
                               failOnCheckpointValidation: Boolean,
                               val retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy,

http://git-wip-us.apache.org/repos/asf/samza/blob/c65802a9/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
index 4e97376..c42882e 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
@@ -21,8 +21,7 @@ package org.apache.samza.checkpoint.kafka
 
 import java.util.Properties
 
-import kafka.utils.ZKStringSerializer
-import org.I0Itec.zkclient.ZkClient
+import kafka.utils.ZkUtils
 import org.apache.kafka.clients.producer.KafkaProducer
 import org.apache.samza.SamzaException
 import org.apache.samza.checkpoint.{CheckpointManager, CheckpointManagerFactory}
@@ -40,8 +39,8 @@ object KafkaCheckpointManagerFactory {
     "compression.type" -> "none")
 
   // Set the checkpoint topic configs to have a very small segment size and
-  // enable log compaction. This keeps job startup time small since there 
-  // are fewer useless (overwritten) messages to read from the checkpoint 
+  // enable log compaction. This keeps job startup time small since there
+  // are fewer useless (overwritten) messages to read from the checkpoint
   // topic.
   def getCheckpointTopicProperties(config: Config) = {
     val segmentBytes: Int = if (config == null) {
@@ -79,7 +78,7 @@ class KafkaCheckpointManagerFactory extends CheckpointManagerFactory with Loggin
     val zkConnect = Option(consumerConfig.zkConnect)
       .getOrElse(throw new SamzaException("no zookeeper.connect defined in config"))
     val connectZk = () => {
-      new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer)
+      ZkUtils(zkConnect, 6000, 6000, false)
     }
     val socketTimeout = consumerConfig.socketTimeoutMs
 
@@ -99,4 +98,4 @@ class KafkaCheckpointManagerFactory extends CheckpointManagerFactory with Loggin
       config.failOnCheckpointValidation,
       checkpointTopicProperties = getCheckpointTopicProperties(config))
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/c65802a9/samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala b/samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala
index bcbad27..4e3b247 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala
@@ -20,7 +20,7 @@
 package org.apache.samza.config
 
 import org.I0Itec.zkclient.ZkClient
-import kafka.utils.{ ZkUtils, ZKStringSerializer }
+import kafka.utils.ZkUtils
 import org.apache.samza.config.KafkaConfig.{ Config2Kafka, REGEX_RESOLVED_STREAMS }
 import org.apache.samza.SamzaException
 import org.apache.samza.util.Util
@@ -102,10 +102,10 @@ class RegExTopicGenerator extends ConfigRewriter with Logging {
     val consumerConfig = config.getKafkaSystemConsumerConfig(systemName, "")
     val zkConnect = Option(consumerConfig.zkConnect)
       .getOrElse(throw new SamzaException("No zookeeper.connect for system %s defined in config." format systemName))
-    val zkClient = new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer)
+    val zkClient = new ZkClient(zkConnect, 6000, 6000)
 
     try {
-      ZkUtils.getAllTopics(zkClient)
+      ZkUtils(zkClient, isZkSecurityEnabled = false).getAllTopics()
     } finally {
       zkClient.close()
     }

http://git-wip-us.apache.org/repos/asf/samza/blob/c65802a9/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
index 5927cca..955fa44 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
@@ -21,14 +21,15 @@ package org.apache.samza.system.kafka
 
 import java.util
 
-import org.I0Itec.zkclient.ZkClient
 import org.apache.samza.Partition
 import org.apache.samza.SamzaException
 import org.apache.samza.system.{ExtendedSystemAdmin, SystemStreamMetadata, SystemStreamPartition}
-import org.apache.samza.util.{ ClientUtilTopicMetadataStore, ExponentialSleepStrategy, Logging }
+import org.apache.samza.util.{ ClientUtilTopicMetadataStore, ExponentialSleepStrategy, Logging, KafkaUtil }
 import kafka.api._
 import kafka.consumer.SimpleConsumer
 import kafka.common.{ TopicExistsException, TopicAndPartition }
+import kafka.consumer.ConsumerConfig
+import kafka.utils.ZkUtils
 import java.util.{ Properties, UUID }
 import scala.collection.JavaConversions
 import scala.collection.JavaConversions._
@@ -37,6 +38,7 @@ import kafka.consumer.ConsumerConfig
 import kafka.admin.AdminUtils
 import org.apache.samza.util.KafkaUtil
 
+
 object KafkaSystemAdmin extends Logging {
   /**
    * A helper method that takes oldest, newest, and upcoming offsets for each
@@ -97,10 +99,10 @@ class KafkaSystemAdmin(
   brokerListString: String,
 
   /**
-   * A method that returns a ZkClient for the Kafka system. This is invoked
+   * A method that returns a ZkUtils for the Kafka system. This is invoked
    * when the system admin is attempting to create a coordinator stream.
    */
-  connectZk: () => ZkClient,
+  connectZk: () => ZkUtils,
 
   /**
    * Custom properties to use when the system admin tries to create a new
@@ -183,6 +185,7 @@ class KafkaSystemAdmin(
    * Returns the offset for the message after the specified offset for each
    * SystemStreamPartition that was passed in.
    */
+
   override def getOffsetsAfter(offsets: java.util.Map[SystemStreamPartition, String]) = {
     // This is safe to do with Kafka, even if a topic is key-deduped. If the
     // offset doesn't exist on a compacted topic, Kafka will return the first

http://git-wip-us.apache.org/repos/asf/samza/blob/c65802a9/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
index 36567b6..fa685ee 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
@@ -24,7 +24,7 @@ import org.apache.samza.util.Logging
 import kafka.message.Message
 import kafka.message.MessageAndOffset
 import org.apache.samza.Partition
-import kafka.utils.Utils
+import org.apache.kafka.common.utils.Utils
 import org.apache.samza.util.Clock
 import java.util.UUID
 import kafka.serializer.DefaultDecoder
@@ -197,13 +197,13 @@ private[kafka] class KafkaSystemConsumer(
         // This avoids trying to re-add the same topic partition repeatedly
         def refresh(tp: List[TopicAndPartition]) = {
           val head :: rest = tpToRefresh
-          // refreshBrokers can be called from abdicate and refreshDropped, 
-          // both of which are triggered from BrokerProxy threads. To prevent 
-          // accidentally creating multiple objects for the same broker, or 
-          // accidentally not updating the topicPartitionsAndOffsets variable, 
-          // we need to lock. 
+          // refreshBrokers can be called from abdicate and refreshDropped,
+          // both of which are triggered from BrokerProxy threads. To prevent
+          // accidentally creating multiple objects for the same broker, or
+          // accidentally not updating the topicPartitionsAndOffsets variable,
+          // we need to lock.
           this.synchronized {
-            // Check if we still need this TopicAndPartition inside the 
+            // Check if we still need this TopicAndPartition inside the
             // critical section. If we don't, then skip it.
             topicPartitionsAndOffsets.get(head) match {
               case Some(nextOffset) =>

http://git-wip-us.apache.org/repos/asf/samza/blob/c65802a9/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
index b574176..d0e3089 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
@@ -20,6 +20,7 @@
 package org.apache.samza.system.kafka
 
 import java.util.Properties
+import kafka.utils.ZkUtils
 import org.apache.samza.SamzaException
 import org.apache.samza.util.{Logging, KafkaUtil, ExponentialSleepStrategy, ClientUtilTopicMetadataStore}
 import org.apache.samza.config.Config
@@ -29,8 +30,6 @@ import org.apache.samza.config.JobConfig.Config2Job
 import org.apache.kafka.clients.producer.KafkaProducer
 import org.apache.samza.system.SystemFactory
 import org.apache.samza.config.StorageConfig._
-import org.I0Itec.zkclient.ZkClient
-import kafka.utils.ZKStringSerializer
 import org.apache.samza.system.SystemProducer
 import org.apache.samza.system.SystemAdmin
 import org.apache.samza.system.SystemConsumer
@@ -90,8 +89,8 @@ class KafkaSystemFactory extends SystemFactory with Logging {
     val getProducer = () => { new KafkaProducer[Array[Byte], Array[Byte]](producerConfig.getProducerProperties) }
     val metrics = new KafkaSystemProducerMetrics(systemName, registry)
 
-    // Unlike consumer, no need to use encoders here, since they come for free 
-    // inside the producer configs. Kafka's producer will handle all of this 
+    // Unlike consumer, no need to use encoders here, since they come for free
+    // inside the producer configs. Kafka's producer will handle all of this
     // for us.
 
     new KafkaSystemProducer(
@@ -111,7 +110,7 @@ class KafkaSystemFactory extends SystemFactory with Logging {
     val zkConnect = Option(consumerConfig.zkConnect)
       .getOrElse(throw new SamzaException("no zookeeper.connect defined in config"))
     val connectZk = () => {
-      new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer)
+      ZkUtils(zkConnect, 6000, 6000, false)
     }
     val coordinatorStreamProperties = getCoordinatorTopicProperties(config)
     val coordinatorStreamReplicationFactor = config.getCoordinatorReplicationFactor.toInt

http://git-wip-us.apache.org/repos/asf/samza/blob/c65802a9/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala b/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala
index e95f052..0f0bc22 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala
@@ -22,7 +22,7 @@ package org.apache.samza.util
 import java.util.Properties
 import java.util.concurrent.atomic.AtomicLong
 import kafka.admin.AdminUtils
-import org.I0Itec.zkclient.ZkClient
+import kafka.utils.ZkUtils
 import org.apache.kafka.clients.producer.{Producer, ProducerRecord}
 import org.apache.kafka.common.PartitionInfo
 import org.apache.samza.config.Config
@@ -86,7 +86,7 @@ object KafkaUtil extends Logging {
 }
 
 class KafkaUtil(val retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy,
-                val connectZk: () => ZkClient) extends Logging {
+                val connectZk: () => ZkUtils) extends Logging {
   /**
    * Common code for creating a topic in Kafka
    *

http://git-wip-us.apache.org/repos/asf/samza/blob/c65802a9/samza-kafka/src/test/java/org/apache/samza/system/kafka/MockKafkaProducer.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/java/org/apache/samza/system/kafka/MockKafkaProducer.java b/samza-kafka/src/test/java/org/apache/samza/system/kafka/MockKafkaProducer.java
index 6f498de..aaa949d 100644
--- a/samza-kafka/src/test/java/org/apache/samza/system/kafka/MockKafkaProducer.java
+++ b/samza-kafka/src/test/java/org/apache/samza/system/kafka/MockKafkaProducer.java
@@ -39,9 +39,9 @@ import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.samza.utils.TestUtils;
 import org.apache.kafka.common.MetricName;
-
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.test.TestUtils;
 
 public class MockKafkaProducer implements Producer<byte[], byte[]> {
 
@@ -113,7 +113,7 @@ public class MockKafkaProducer implements Producer<byte[], byte[]> {
   }
 
   private RecordMetadata getRecordMetadata(ProducerRecord record) {
-    return new RecordMetadata(new TopicPartition(record.topic(), record.partition() == null ? 0 : record.partition()), 0, this.msgsSent.get());
+    return new RecordMetadata(new TopicPartition(record.topic(), record.partition() == null ? 0 : record.partition()), 0, this.msgsSent.get(), Record.NO_TIMESTAMP, -1, -1, -1);
   }
 
   @Override
@@ -174,6 +174,16 @@ public class MockKafkaProducer implements Producer<byte[], byte[]> {
 
   }
 
+  @Override
+  public void close(long timeout, TimeUnit timeUnit) {
+
+  }
+
+  public synchronized void flush () {
+
+  }
+
+
   private static class FutureFailure implements Future<RecordMetadata> {
 
     private final ExecutionException exception;
@@ -215,7 +225,7 @@ public class MockKafkaProducer implements Producer<byte[], byte[]> {
 
     public FutureSuccess(ProducerRecord record, int offset) {
       this.record = record;
-      this._metadata = new RecordMetadata(new TopicPartition(record.topic(), record.partition() == null ? 0 : record.partition()), 0, offset);
+      this._metadata = new RecordMetadata(new TopicPartition(record.topic(), record.partition() == null ? 0 : record.partition()), 0, offset, Record.NO_TIMESTAMP, -1, -1, -1);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/samza/blob/c65802a9/samza-kafka/src/test/java/org/apache/samza/utils/TestUtils.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/java/org/apache/samza/utils/TestUtils.java b/samza-kafka/src/test/java/org/apache/samza/utils/TestUtils.java
deleted file mode 100644
index 2fa743f..0000000
--- a/samza-kafka/src/test/java/org/apache/samza/utils/TestUtils.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.samza.utils;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.ServerSocket;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-import org.apache.kafka.common.Cluster;
-import org.apache.kafka.common.Node;
-import org.apache.kafka.common.PartitionInfo;
-
-import static java.util.Arrays.asList;
-
-
-/**
- * Copied from :kafka-clients API as a workaround until KAFKA-1861 is resolved
- * Helper functions for writing unit tests
- */
-public class TestUtils {
-
-    public static File IO_TMP_DIR = new File(System.getProperty("java.io.tmpdir"));
-
-    public static String LETTERS = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz";
-    public static String DIGITS = "0123456789";
-    public static String LETTERS_AND_DIGITS = LETTERS + DIGITS;
-
-    /* A consistent random number generator to make tests repeatable */
-    public static final Random seededRandom = new Random(192348092834L);
-    public static final Random random = new Random();
-
-    public static Cluster singletonCluster(String topic, int partitions) {
-        return clusterWith(1, topic, partitions);
-    }
-
-    public static Cluster clusterWith(int nodes, String topic, int partitions) {
-        Node[] ns = new Node[nodes];
-        for (int i = 0; i < nodes; i++)
-            ns[i] = new Node(0, "localhost", 1969);
-        List<PartitionInfo> parts = new ArrayList<PartitionInfo>();
-        for (int i = 0; i < partitions; i++)
-            parts.add(new PartitionInfo(topic, i, ns[i % ns.length], ns, ns));
-        return new Cluster(asList(ns), parts);
-    }
-
-    /**
-     * Choose a number of random available ports
-     */
-    public static int[] choosePorts(int count) {
-        try {
-            ServerSocket[] sockets = new ServerSocket[count];
-            int[] ports = new int[count];
-            for (int i = 0; i < count; i++) {
-                sockets[i] = new ServerSocket(0);
-                ports[i] = sockets[i].getLocalPort();
-            }
-            for (int i = 0; i < count; i++)
-                sockets[i].close();
-            return ports;
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    /**
-     * Choose an available port
-     */
-    public static int choosePort() {
-        return choosePorts(1)[0];
-    }
-
-    /**
-     * Generate an array of random bytes
-     * 
-     * @param size The size of the array
-     */
-    public static byte[] randomBytes(int size) {
-        byte[] bytes = new byte[size];
-        seededRandom.nextBytes(bytes);
-        return bytes;
-    }
-
-    /**
-     * Generate a random string of letters and digits of the given length
-     * 
-     * @param len The length of the string
-     * @return The random string
-     */
-    public static String randomString(int len) {
-        StringBuilder b = new StringBuilder();
-        for (int i = 0; i < len; i++)
-            b.append(LETTERS_AND_DIGITS.charAt(seededRandom.nextInt(LETTERS_AND_DIGITS.length())));
-        return b.toString();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/c65802a9/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
index e6815da..1f2f62f 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
@@ -22,10 +22,11 @@ package org.apache.samza.checkpoint.kafka
 import kafka.admin.AdminUtils
 import kafka.common.{InvalidMessageSizeException, UnknownTopicOrPartitionException}
 import kafka.message.InvalidMessageException
-import kafka.server.{KafkaConfig, KafkaServer}
-import kafka.utils.{TestUtils, TestZKUtils, Utils, ZKStringSerializer}
-import kafka.zk.EmbeddedZookeeper
-import org.I0Itec.zkclient.ZkClient
+import kafka.server.{KafkaConfig, KafkaServer, ConfigType}
+import kafka.utils.{CoreUtils, TestUtils, ZkUtils}
+import kafka.integration.KafkaServerTestHarness
+
+import org.apache.kafka.common.security.JaasUtils
 import org.apache.kafka.clients.producer.{KafkaProducer, Producer, ProducerConfig, ProducerRecord}
 import org.apache.samza.checkpoint.Checkpoint
 import org.apache.samza.config.{JobConfig, KafkaProducerConfig, MapConfig}
@@ -41,71 +42,59 @@ import org.junit._
 import scala.collection.JavaConversions._
 import scala.collection._
 
-class TestKafkaCheckpointManager {
+class TestKafkaCheckpointManager extends KafkaServerTestHarness {
+
+  protected def numBrokers: Int = 3
+
+  def generateConfigs() = {
+    val props = TestUtils.createBrokerConfigs(numBrokers, zkConnect, true)
+    props.map(KafkaConfig.fromProps)
+  }
 
   val checkpointTopic = "checkpoint-topic"
   val serdeCheckpointTopic = "checkpoint-topic-invalid-serde"
   val checkpointTopicConfig = KafkaCheckpointManagerFactory.getCheckpointTopicProperties(null)
-  val zkConnect: String = TestZKUtils.zookeeperConnect
-  var zkClient: ZkClient = null
-  val zkConnectionTimeout = 6000
-  val zkSessionTimeout = 6000
-
-  val brokerId1 = 0
-  val brokerId2 = 1
-  val brokerId3 = 2
-  val ports = TestUtils.choosePorts(3)
-  val (port1, port2, port3) = (ports(0), ports(1), ports(2))
-
-  val props1 = TestUtils.createBrokerConfig(brokerId1, port1)
-  props1.put("controlled.shutdown.enable", "true")
-  val props2 = TestUtils.createBrokerConfig(brokerId2, port2)
-  props1.put("controlled.shutdown.enable", "true")
-  val props3 = TestUtils.createBrokerConfig(brokerId3, port3)
-  props1.put("controlled.shutdown.enable", "true")
-
-  val config = new java.util.HashMap[String, Object]()
-  val brokers = "localhost:%d,localhost:%d,localhost:%d" format (port1, port2, port3)
-  config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
-  config.put("acks", "all")
-  config.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1")
-  config.put(ProducerConfig.RETRIES_CONFIG, (new Integer(java.lang.Integer.MAX_VALUE-1)).toString)
-  config.putAll(KafkaCheckpointManagerFactory.INJECTED_PRODUCER_PROPERTIES)
-  val producerConfig = new KafkaProducerConfig("kafka", "i001", config)
+
+  val zkSecure = JaasUtils.isZkSecurityEnabled()
+
   val partition = new Partition(0)
   val partition2 = new Partition(1)
   val cp1 = new Checkpoint(Map(new SystemStreamPartition("kafka", "topic", partition) -> "123"))
   val cp2 = new Checkpoint(Map(new SystemStreamPartition("kafka", "topic", partition) -> "12345"))
-  var zookeeper: EmbeddedZookeeper = null
-  var server1: KafkaServer = null
-  var server2: KafkaServer = null
-  var server3: KafkaServer = null
+
+  var producerConfig: KafkaProducerConfig = null
+
   var metadataStore: TopicMetadataStore = null
   var failOnTopicValidation = true
 
   val systemStreamPartitionGrouperFactoryString = classOf[GroupByPartitionFactory].getCanonicalName
 
   @Before
-  def beforeSetupServers {
-    zookeeper = new EmbeddedZookeeper(zkConnect)
-    server1 = TestUtils.createServer(new KafkaConfig(props1))
-    server2 = TestUtils.createServer(new KafkaConfig(props2))
-    server3 = TestUtils.createServer(new KafkaConfig(props3))
+  override def setUp {
+    super.setUp
+
+    TestUtils.waitUntilTrue(() => servers.head.metadataCache.getAliveBrokers.size == numBrokers, "Wait for cache to update")
+
+    val config = new java.util.HashMap[String, Object]()
+    val brokers = brokerList.split(",").map(p => "localhost" + p).mkString(",")
+
+    config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
+    config.put("acks", "all")
+    config.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1")
+    config.put(ProducerConfig.RETRIES_CONFIG, (new Integer(java.lang.Integer.MAX_VALUE-1)).toString)
+    config.putAll(KafkaCheckpointManagerFactory.INJECTED_PRODUCER_PROPERTIES)
+    producerConfig = new KafkaProducerConfig("kafka", "i001", config)
+
     metadataStore = new ClientUtilTopicMetadataStore(brokers, "some-job-name")
   }
 
   @After
-  def afterCleanLogDirs {
-    server1.shutdown
-    server1.awaitShutdown()
-    server2.shutdown
-    server2.awaitShutdown()
-    server3.shutdown
-    server3.awaitShutdown()
-    Utils.rm(server1.config.logDirs)
-    Utils.rm(server2.config.logDirs)
-    Utils.rm(server3.config.logDirs)
-    zookeeper.shutdown
+  override def tearDown() {
+    if (servers != null) {
+      servers.foreach(_.shutdown())
+      servers.foreach(server => CoreUtils.delete(server.config.logDirs))
+    }
+    super.tearDown
   }
 
   private def writeCheckpoint(taskName: TaskName, checkpoint: Checkpoint, cpTopic: String = checkpointTopic) = {
@@ -127,7 +116,7 @@ class TestKafkaCheckpointManager {
 
 
   private def createCheckpointTopic(cpTopic: String = checkpointTopic, partNum: Int = 1) = {
-    val zkClient = new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer)
+    val zkClient = ZkUtils(zkConnect, 6000, 6000, zkSecure)
     try {
       AdminUtils.createTopic(
         zkClient,
@@ -151,8 +140,8 @@ class TestKafkaCheckpointManager {
     kcm.kafkaUtil.validateTopicPartitionCount(checkpointTopic, "kafka", metadataStore, 1)
 
     // check that log compaction is enabled.
-    val zkClient = new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer)
-    val topicConfig = AdminUtils.fetchTopicConfig(zkClient, checkpointTopic)
+    val zkClient = ZkUtils(zkConnect, 6000, 6000, zkSecure)
+    val topicConfig = AdminUtils.fetchEntityConfig(zkClient, ConfigType.Topic, checkpointTopic)
     zkClient.close
     assertEquals("compact", topicConfig.get("cleanup.policy"))
     assertEquals("26214400", topicConfig.get("segment.bytes"))
@@ -242,7 +231,7 @@ class TestKafkaCheckpointManager {
     fetchSize = 300 * 1024,
     metadataStore = metadataStore,
     connectProducer = () => new KafkaProducer(producerConfig.getProducerProperties),
-    connectZk = () => new ZkClient(zkConnect, 60000, 60000, ZKStringSerializer),
+    connectZk = () => ZkUtils(zkConnect, 6000, 6000, zkSecure),
     systemStreamPartitionGrouperFactoryString = systemStreamPartitionGrouperFactoryString,
     failOnCheckpointValidation = failOnTopicValidation,
     checkpointTopicProperties = KafkaCheckpointManagerFactory.getCheckpointTopicProperties(new MapConfig(Map[String, String]())))
@@ -261,7 +250,7 @@ class TestKafkaCheckpointManager {
     fetchSize = 300 * 1024,
     metadataStore = metadataStore,
     connectProducer = () => new KafkaProducer(producerConfig.getProducerProperties),
-    connectZk = () => new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer),
+    connectZk = () => ZkUtils(zkConnect, 6000, 6000, zkSecure),
     systemStreamPartitionGrouperFactoryString = systemStreamPartitionGrouperFactoryString,
     failOnCheckpointValidation = failOnTopicValidation,
     serde = new InvalideSerde(exception),

http://git-wip-us.apache.org/repos/asf/samza/blob/c65802a9/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
index f00405d..0e3c9b5 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
@@ -28,10 +28,11 @@ import kafka.admin.AdminUtils
 import kafka.common.{ErrorMapping, LeaderNotAvailableException}
 import kafka.consumer.{Consumer, ConsumerConfig}
 import kafka.server.{KafkaConfig, KafkaServer}
-import kafka.utils.{TestUtils, TestZKUtils, Utils, ZKStringSerializer}
-import kafka.zk.EmbeddedZookeeper
-import org.I0Itec.zkclient.ZkClient
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
+import kafka.utils.{TestUtils, ZkUtils}
+import kafka.integration.KafkaServerTestHarness
+import org.apache.kafka.common.security.JaasUtils
+
 import org.apache.samza.Partition
 import org.apache.samza.config.KafkaProducerConfig
 import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
@@ -42,54 +43,56 @@ import org.junit._
 
 import scala.collection.JavaConversions._
 
-object TestKafkaSystemAdmin {
+
+object TestKafkaSystemAdmin extends KafkaServerTestHarness {
+
   val SYSTEM = "kafka"
   val TOPIC = "input"
   val TOPIC2 = "input2"
   val TOTAL_PARTITIONS = 50
   val REPLICATION_FACTOR = 2
+  val zkSecure = JaasUtils.isZkSecurityEnabled()
+
+  protected def numBrokers: Int = 3
 
-  val zkConnect: String = TestZKUtils.zookeeperConnect
-  var zkClient: ZkClient = null
-  val zkConnectionTimeout = 6000
-  val zkSessionTimeout = 6000
-  val brokerId1 = 0
-  val brokerId2 = 1
-  val brokerId3 = 2
-  val ports = TestUtils.choosePorts(3)
-  val (port1, port2, port3) = (ports(0), ports(1), ports(2))
-
-  val props1 = TestUtils.createBrokerConfig(brokerId1, port1)
-  val props2 = TestUtils.createBrokerConfig(brokerId2, port2)
-  val props3 = TestUtils.createBrokerConfig(brokerId3, port3)
-
-  val config = new util.HashMap[String, Object]()
-  val brokers = "localhost:%d,localhost:%d,localhost:%d" format (port1, port2, port3)
-  config.put("bootstrap.servers", brokers)
-  config.put("acks", "all")
-  config.put("serializer.class", "kafka.serializer.StringEncoder")
-  val producerConfig = new KafkaProducerConfig("kafka", "i001", config)
   var producer: KafkaProducer[Array[Byte], Array[Byte]] = null
-  var zookeeper: EmbeddedZookeeper = null
-  var server1: KafkaServer = null
-  var server2: KafkaServer = null
-  var server3: KafkaServer = null
   var metadataStore: TopicMetadataStore = null
+  var producerConfig: KafkaProducerConfig = null
+  var brokers: String = null
+
+  def generateConfigs() = {
+    val props = TestUtils.createBrokerConfigs(numBrokers, zkConnect, true)
+    props.map(KafkaConfig.fromProps)
+  }
 
   @BeforeClass
-  def beforeSetupServers {
-    zookeeper = new EmbeddedZookeeper(zkConnect)
-    server1 = TestUtils.createServer(new KafkaConfig(props1))
-    server2 = TestUtils.createServer(new KafkaConfig(props2))
-    server3 = TestUtils.createServer(new KafkaConfig(props3))
-    zkClient = new ZkClient(zkConnect + "/", 6000, 6000, ZKStringSerializer)
+  override def setUp {
+    super.setUp
+
+    val config = new java.util.HashMap[String, Object]()
+
+    brokers = brokerList.split(",").map(p => "localhost" + p).mkString(",")
+
+    config.put("bootstrap.servers", brokers)
+    config.put("acks", "all")
+    config.put("serializer.class", "kafka.serializer.StringEncoder")
+
+    producerConfig = new KafkaProducerConfig("kafka", "i001", config)
+
     producer = new KafkaProducer[Array[Byte], Array[Byte]](producerConfig.getProducerProperties)
     metadataStore = new ClientUtilTopicMetadataStore(brokers, "some-job-name")
   }
 
+
+  @AfterClass
+  override def tearDown {
+    super.tearDown
+  }
+
+
   def createTopic(topicName: String, partitionCount: Int) {
     AdminUtils.createTopic(
-      zkClient,
+      zkUtils,
       topicName,
       partitionCount,
       REPLICATION_FACTOR)
@@ -133,21 +136,6 @@ object TestKafkaSystemAdmin {
     Consumer.create(consumerConfig)
   }
 
-  @AfterClass
-  def afterCleanLogDirs {
-    producer.close()
-    server1.shutdown
-    server1.awaitShutdown()
-    server2.shutdown
-    server2.awaitShutdown()
-    server3.shutdown
-    server3.awaitShutdown()
-    Utils.rm(server1.config.logDirs)
-    Utils.rm(server2.config.logDirs)
-    Utils.rm(server3.config.logDirs)
-    zkClient.close
-    zookeeper.shutdown
-  }
 }
 
 /**
@@ -158,7 +146,7 @@ class TestKafkaSystemAdmin {
   import TestKafkaSystemAdmin._
 
   // Provide a random zkAddress, the system admin tries to connect only when a topic is created/validated
-  val systemAdmin = new KafkaSystemAdmin(SYSTEM, brokers, connectZk = () => new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer))
+  val systemAdmin = new KafkaSystemAdmin(SYSTEM, brokers, connectZk = () => ZkUtils(zkConnect, 6000, 6000, zkSecure))
 
   @Test
   def testShouldAssembleMetadata {
@@ -219,7 +207,7 @@ class TestKafkaSystemAdmin {
     // Empty Kafka topics should have a next offset of 0.
     assertEquals("0", sspMetadata.get(new Partition(0)).getUpcomingOffset)
 
-    // Add a new message to one of the partitions, and verify that it works as 
+    // Add a new message to one of the partitions, and verify that it works as
     // expected.
     producer.send(new ProducerRecord(TOPIC, 48, "key1".getBytes, "val1".getBytes)).get()
     metadata = systemAdmin.getSystemStreamMetadata(Set(TOPIC))
@@ -285,7 +273,8 @@ class TestKafkaSystemAdmin {
   @Test
   def testShouldCreateCoordinatorStream {
     val topic = "test-coordinator-stream"
-    val systemAdmin = new KafkaSystemAdmin(SYSTEM, brokers, () => new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer), coordinatorStreamReplicationFactor = 3)
+    val systemAdmin = new KafkaSystemAdmin(SYSTEM, brokers, () => ZkUtils(zkConnect, 6000, 6000, zkSecure), coordinatorStreamReplicationFactor = 3)
+
     systemAdmin.createCoordinatorStream(topic)
     validateTopic(topic, 1)
     val topicMetadataMap = TopicMetadataCache.getTopicMetadata(Set(topic), "kafka", metadataStore.getTopicInfo)
@@ -296,9 +285,10 @@ class TestKafkaSystemAdmin {
     assertEquals(3, partitionMetadata.replicas.size)
   }
 
-  class KafkaSystemAdminWithTopicMetadataError extends KafkaSystemAdmin(SYSTEM, brokers, () => new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer)) {
+  class KafkaSystemAdminWithTopicMetadataError extends KafkaSystemAdmin(SYSTEM, brokers, () => ZkUtils(zkConnect, 6000, 6000, zkSecure)) {
     import kafka.api.TopicMetadata
     var metadataCallCount = 0
+
     // Simulate Kafka telling us that the leader for the topic is not available
     override def getTopicMetadata(topics: Set[String]) = {
       metadataCallCount += 1

http://git-wip-us.apache.org/repos/asf/samza/blob/c65802a9/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala
index 3b3ed3d..8a5cbc2 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala
@@ -67,7 +67,7 @@ class TestKafkaSystemConsumer {
     val metrics = new KafkaSystemConsumerMetrics
     // Lie and tell the store that the partition metadata is empty. We can't
     // use partition metadata because it has Broker in its constructor, which
-    // is package private to Kafka. 
+    // is package private to Kafka.
     val metadataStore = new MockMetadataStore(Map(streamName -> TopicMetadata(streamName, Seq.empty, 0)))
     var hosts = List[String]()
     var getHostPortCount = 0
@@ -81,7 +81,7 @@ class TestKafkaSystemConsumer {
       override def createBrokerProxy(host: String, port: Int): BrokerProxy = {
         new BrokerProxy(host, port, systemName, "", metrics, sink) {
           override def addTopicPartition(tp: TopicAndPartition, nextOffset: Option[String]) = {
-            // Skip this since we normally do verification of offsets, which 
+            // Skip this since we normally do verification of offsets, which
             // tries to connect to Kafka. Rather than mock that, just forget it.
             nextOffsets.size
           }
@@ -159,13 +159,12 @@ class TestKafkaSystemConsumer {
 
     val msg = Array[Byte](5, 112, 9, 126)
     val msgAndOffset: MessageAndOffset = MessageAndOffset(new Message(msg), 887654)
-    // 4 data + 14 Message overhead + 80 IncomingMessageEnvelope overhead
+    // 4 data + 18 Message overhead + 80 IncomingMessageEnvelope overhead
     consumer.sink.addMessage(new TopicAndPartition("test-stream", 0),  msgAndOffset, 887354)
 
-    assertEquals(98, consumer.getMessagesSizeInQueue(new SystemStreamPartition("test-system", "test-stream", new Partition(0))))
+    assertEquals(106, consumer.getMessagesSizeInQueue(new SystemStreamPartition("test-system", "test-stream", new Partition(0))))
   }
 
-
   @Test
   def testFetchThresholdBytesDisabled {
     val metadataStore = new MockMetadataStore
@@ -190,4 +189,3 @@ class TestKafkaSystemConsumer {
 class MockMetadataStore(var metadata: Map[String, TopicMetadata] = Map()) extends TopicMetadataStore {
   def getTopicInfo(topics: Set[String]): Map[String, TopicMetadata] = metadata
 }
-

http://git-wip-us.apache.org/repos/asf/samza/blob/c65802a9/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala
index e4250e7..7331611 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala
@@ -27,6 +27,7 @@ import java.util
 import org.junit.Assert._
 import org.scalatest.Assertions.intercept
 import org.apache.kafka.common.errors.{TimeoutException, RecordTooLargeException}
+import org.apache.kafka.test.MockSerializer
 import org.apache.samza.SamzaException
 
 
@@ -37,13 +38,14 @@ class TestKafkaSystemProducer {
 
   @Test
   def testKafkaProducer {
+    val mockProducer = new MockProducer(true, new MockSerializer, new MockSerializer)
     val systemProducer = new KafkaSystemProducer(systemName = "test",
-                                           getProducer = () => { new MockProducer(true) },
-                                           metrics = new KafkaSystemProducerMetrics)
+                                                 getProducer = () => mockProducer,
+                                                 metrics = new KafkaSystemProducerMetrics)
     systemProducer.register("test")
     systemProducer.start
     systemProducer.send("test", someMessage)
-    assertEquals(1, systemProducer.producer.asInstanceOf[MockProducer].history().size())
+    assertEquals(1, systemProducer.producer.asInstanceOf[MockProducer[Array[Byte], Array[Byte]]].history().size())
     systemProducer.stop
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/c65802a9/samza-test/src/main/python/configs/downloads.json
----------------------------------------------------------------------
diff --git a/samza-test/src/main/python/configs/downloads.json b/samza-test/src/main/python/configs/downloads.json
index 22694c0..c468156 100644
--- a/samza-test/src/main/python/configs/downloads.json
+++ b/samza-test/src/main/python/configs/downloads.json
@@ -1,5 +1,5 @@
 {
-  "url_kafka": "http://www.us.apache.org/dist/kafka/0.8.2.0/kafka_2.9.2-0.8.2.0.tgz",
-  "url_zookeeper": "http://archive.apache.org/dist/zookeeper/zookeeper-3.4.3/zookeeper-3.4.3.tar.gz",
+  "url_kafka": "http://archive.apache.org/dist/kafka/0.10.0.1/kafka_2.10-0.10.0.1.tgz",
+  "url_zookeeper": "http://archive.apache.org/dist/zookeeper/zookeeper-3.4.6/zookeeper-3.4.6.tar.gz",
   "url_hadoop": "https://archive.apache.org/dist/hadoop/common/hadoop-2.6.1/hadoop-2.6.1.tar.gz"
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/c65802a9/samza-test/src/main/python/configs/kafka.json
----------------------------------------------------------------------
diff --git a/samza-test/src/main/python/configs/kafka.json b/samza-test/src/main/python/configs/kafka.json
index ab2f346..ece2a34 100644
--- a/samza-test/src/main/python/configs/kafka.json
+++ b/samza-test/src/main/python/configs/kafka.json
@@ -3,21 +3,21 @@
     "kafka_instance_0": "localhost"
   },
   "kafka_port": 9092,
-  "kafka_start_cmd": "kafka_2.9.2-0.8.2.0/bin/kafka-server-start.sh -daemon kafka_2.9.2-0.8.2.0/config/server.properties",
-  "kafka_stop_cmd": "kafka_2.9.2-0.8.2.0/bin/kafka-server-stop.sh",
+  "kafka_start_cmd": "kafka_2.10-0.10.0.1/bin/kafka-server-start.sh -daemon kafka_2.10-0.10.0.1/config/server.properties",
+  "kafka_stop_cmd": "kafka_2.10-0.10.0.1/bin/kafka-server-stop.sh",
   "kafka_install_path": "deploy/kafka",
-  "kafka_executable": "kafka_2.9.2-0.8.2.0.tgz",
+  "kafka_executable": "kafka_2.10-0.10.0.1.tgz",
   "kafka_post_install_cmds": [
-    "sed -i.bak 's/SIGINT/SIGTERM/g' kafka_2.9.2-0.8.2.0/bin/kafka-server-stop.sh",
-    "sed -i.bak 's/^num\\.partitions *=.*/num.partitions=1/' kafka_2.9.2-0.8.2.0/config/server.properties",
-    "sed -i.bak 's/.*log.dirs.*/log.dirs=data/g' kafka_2.9.2-0.8.2.0/config/server.properties"
+    "sed -i.bak 's/SIGINT/SIGTERM/g' kafka_2.10-0.10.0.1/bin/kafka-server-stop.sh",
+    "sed -i.bak 's/^num\\.partitions *=.*/num.partitions=1/' kafka_2.10-0.10.0.1/config/server.properties",
+    "sed -i.bak 's/.*log.dirs.*/log.dirs=data/g' kafka_2.10-0.10.0.1/config/server.properties"
   ],
   "kafka_logs": [
     "log-cleaner.log",
-    "kafka_2.9.2-0.8.2.0/logs/controller.log",
-    "kafka_2.9.2-0.8.2.0/logs/kafka-request.log",
-    "kafka_2.9.2-0.8.2.0/logs/kafkaServer-gc.log",
-    "kafka_2.9.2-0.8.2.0/logs/server.log",
-    "kafka_2.9.2-0.8.2.0/logs/state-change.log"
+    "kafka_2.10-0.10.0.1/logs/controller.log",
+    "kafka_2.10-0.10.0.1/logs/kafka-request.log",
+    "kafka_2.10-0.10.0.1/logs/kafkaServer-gc.log",
+    "kafka_2.10-0.10.0.1/logs/server.log",
+    "kafka_2.10-0.10.0.1/logs/state-change.log"
   ]
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/c65802a9/samza-test/src/main/python/configs/zookeeper.json
----------------------------------------------------------------------
diff --git a/samza-test/src/main/python/configs/zookeeper.json b/samza-test/src/main/python/configs/zookeeper.json
index 2762197..250bfce 100644
--- a/samza-test/src/main/python/configs/zookeeper.json
+++ b/samza-test/src/main/python/configs/zookeeper.json
@@ -2,13 +2,13 @@
   "zookeeper_hosts": {
     "zookeeper_instance_0": "localhost"
   },
-  "zookeeper_start_cmd": "zookeeper-3.4.3/bin/zkServer.sh start",
-  "zookeeper_stop_cmd": "zookeeper-3.4.3/bin/zkServer.sh stop",
+  "zookeeper_start_cmd": "zookeeper-3.4.6/bin/zkServer.sh start",
+  "zookeeper_stop_cmd": "zookeeper-3.4.6/bin/zkServer.sh stop",
   "zookeeper_install_path": "deploy/zookeeper",
-  "zookeeper_executable": "zookeeper-3.4.3.tar.gz",
+  "zookeeper_executable": "zookeeper-3.4.6.tar.gz",
   "zookeeper_post_install_cmds": [
-    "cp zookeeper-3.4.3/conf/zoo_sample.cfg zookeeper-3.4.3/conf/zoo.cfg",
-    "sed -i.bak 's/.*dataDir=.*/dataDir=data/g' zookeeper-3.4.3/conf/zoo.cfg"
+    "cp zookeeper-3.4.6/conf/zoo_sample.cfg zookeeper-3.4.6/conf/zoo.cfg",
+    "sed -i.bak 's/.*dataDir=.*/dataDir=data/g' zookeeper-3.4.6/conf/zoo.cfg"
   ],
   "zookeeper_logs": [
     "zookeeper.out"

http://git-wip-us.apache.org/repos/asf/samza/blob/c65802a9/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
----------------------------------------------------------------------
diff --git a/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala b/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
index 8d7e3fe..5d82b92 100644
--- a/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
+++ b/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
@@ -22,17 +22,19 @@ package org.apache.samza.test.integration
 import java.util
 import java.util.Properties
 import java.util.concurrent.{CountDownLatch, TimeUnit}
+import javax.security.auth.login.Configuration
 
 import kafka.admin.AdminUtils
 import kafka.consumer.{Consumer, ConsumerConfig}
 import kafka.message.MessageAndMetadata
 import kafka.server.{KafkaConfig, KafkaServer}
-import kafka.utils.{TestUtils, TestZKUtils, Utils, ZKStringSerializer}
+import kafka.utils.{TestUtils, CoreUtils, ZkUtils}
 import kafka.zk.EmbeddedZookeeper
-import org.I0Itec.zkclient.ZkClient
 import org.apache.kafka.clients.producer.{KafkaProducer, Producer, ProducerConfig, ProducerRecord}
 import org.apache.samza.Partition
 import org.apache.samza.checkpoint.Checkpoint
+import org.apache.kafka.common.protocol.SecurityProtocol
+import org.apache.kafka.common.security.JaasUtils
 import org.apache.samza.config.{Config, KafkaProducerConfig, MapConfig}
 import org.apache.samza.container.TaskName
 import org.apache.samza.job.local.ThreadJobFactory
@@ -44,7 +46,7 @@ import org.apache.samza.util.{ClientUtilTopicMetadataStore, KafkaUtil, TopicMeta
 import org.junit.Assert._
 
 import scala.collection.JavaConversions._
-import scala.collection.mutable.{ArrayBuffer, HashMap, SynchronizedMap}
+import scala.collection.mutable.{ArrayBuffer, Buffer, HashMap, SynchronizedMap}
 
 /*
  * This creates an singleton instance of TestBaseStreamTask and implement the helper functions to
@@ -58,39 +60,19 @@ object StreamTaskTestUtil {
   val TOTAL_TASK_NAMES = 1
   val REPLICATION_FACTOR = 3
 
-  val zkConnect: String = TestZKUtils.zookeeperConnect
-  var zkClient: ZkClient = null
   val zkConnectionTimeout = 6000
   val zkSessionTimeout = 6000
 
-  val brokerId1 = 0
-  val brokerId2 = 1
-  val brokerId3 = 2
-  val ports = TestUtils.choosePorts(3)
-  val (port1, port2, port3) = (ports(0), ports(1), ports(2))
-
-  val props1 = TestUtils.createBrokerConfig(brokerId1, port1)
-  val props2 = TestUtils.createBrokerConfig(brokerId2, port2)
-  val props3 = TestUtils.createBrokerConfig(brokerId3, port3)
-  props1.setProperty("auto.create.topics.enable","false")
-  props2.setProperty("auto.create.topics.enable","false")
-  props3.setProperty("auto.create.topics.enable","false")
-
-  val config = new util.HashMap[String, Object]()
-  val brokers = "localhost:%d,localhost:%d,localhost:%d" format (port1, port2, port3)
-  config.put("bootstrap.servers", brokers)
-  config.put("request.required.acks", "-1")
-  config.put("serializer.class", "kafka.serializer.StringEncoder")
-  config.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1")
-  config.put(ProducerConfig.RETRIES_CONFIG, (new Integer(Integer.MAX_VALUE-1)).toString())
-  val producerConfig = new KafkaProducerConfig("kafka", "i001", config)
+  var zkUtils: ZkUtils = null
+  var zookeeper: EmbeddedZookeeper = null
+  var brokers: String = null
+  def zkPort: Int = zookeeper.port
+  def zkConnect: String = s"127.0.0.1:$zkPort"
+
   var producer: Producer[Array[Byte], Array[Byte]] = null
   val cp1 = new Checkpoint(Map(new SystemStreamPartition("kafka", "topic", new Partition(0)) -> "123"))
   val cp2 = new Checkpoint(Map(new SystemStreamPartition("kafka", "topic", new Partition(0)) -> "12345"))
-  var zookeeper: EmbeddedZookeeper = null
-  var server1: KafkaServer = null
-  var server2: KafkaServer = null
-  var server3: KafkaServer = null
+
   var metadataStore: TopicMetadataStore = null
 
   /*
@@ -107,8 +89,8 @@ object StreamTaskTestUtil {
     "systems.kafka.samza.offset.default" -> "oldest", // applies to a nonempty topic
     "systems.kafka.consumer.auto.offset.reset" -> "smallest", // applies to an empty topic
     "systems.kafka.samza.msg.serde" -> "string",
-    "systems.kafka.consumer.zookeeper.connect" -> zkConnect,
-    "systems.kafka.producer.bootstrap.servers" -> ("localhost:%s" format port1),
+    "systems.kafka.consumer.zookeeper.connect" -> "localhost:2181",
+    "systems.kafka.producer.bootstrap.servers" -> "localhost:9092",
     // Since using state, need a checkpoint manager
     "task.checkpoint.factory" -> "org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory",
     "task.checkpoint.system" -> "kafka",
@@ -122,12 +104,36 @@ object StreamTaskTestUtil {
     TestTask.reset()
   }
 
+  var servers: Buffer[KafkaServer] = null
+
   def beforeSetupServers {
-    zookeeper = new EmbeddedZookeeper(zkConnect)
-    server1 = TestUtils.createServer(new KafkaConfig(props1))
-    server2 = TestUtils.createServer(new KafkaConfig(props2))
-    server3 = TestUtils.createServer(new KafkaConfig(props3))
-    zkClient = new ZkClient(zkConnect + "/", 6000, 6000, ZKStringSerializer)
+    zookeeper = new EmbeddedZookeeper()
+    zkUtils = ZkUtils(zkConnect, zkSessionTimeout, zkConnectionTimeout, JaasUtils.isZkSecurityEnabled())
+
+    val props = TestUtils.createBrokerConfigs(3, zkConnect, true)
+
+    val configs = props.map(p => {
+      p.setProperty("auto.create.topics.enable","false")
+      KafkaConfig.fromProps(p)
+    })
+
+    servers = configs.map(TestUtils.createServer(_)).toBuffer
+
+    val brokerList = TestUtils.getBrokerListStrFromServers(servers, SecurityProtocol.PLAINTEXT)
+    brokers = brokerList.split(",").map(p => "localhost" + p).mkString(",")
+
+    jobConfig ++= Map("systems.kafka.consumer.zookeeper.connect" -> zkConnect,
+      "systems.kafka.producer.bootstrap.servers" -> brokers)
+
+    val config = new util.HashMap[String, Object]()
+
+    config.put("bootstrap.servers", brokers)
+    config.put("request.required.acks", "-1")
+    config.put("serializer.class", "kafka.serializer.StringEncoder")
+    config.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1")
+    config.put(ProducerConfig.RETRIES_CONFIG, (new Integer(Integer.MAX_VALUE-1)).toString())
+    val producerConfig = new KafkaProducerConfig("kafka", "i001", config)
+
     producer = new KafkaProducer[Array[Byte], Array[Byte]](producerConfig.getProducerProperties)
     metadataStore = new ClientUtilTopicMetadataStore(brokers, "some-job-name")
 
@@ -137,7 +143,7 @@ object StreamTaskTestUtil {
 
   def createTopics {
     AdminUtils.createTopic(
-      zkClient,
+      zkUtils,
       INPUT_TOPIC,
       TOTAL_TASK_NAMES,
       REPLICATION_FACTOR)
@@ -174,18 +180,15 @@ object StreamTaskTestUtil {
   }
 
   def afterCleanLogDirs {
-    producer.close()
-    server1.shutdown
-    server1.awaitShutdown()
-    server2.shutdown
-    server2.awaitShutdown()
-    server3.shutdown
-    server3.awaitShutdown()
-    Utils.rm(server1.config.logDirs)
-    Utils.rm(server2.config.logDirs)
-    Utils.rm(server3.config.logDirs)
-    zkClient.close
-    zookeeper.shutdown
+    servers.foreach(_.shutdown())
+    servers.foreach(server => CoreUtils.delete(server.config.logDirs))
+
+    if (zkUtils != null)
+     CoreUtils.swallow(zkUtils.close())
+    if (zookeeper != null)
+      CoreUtils.swallow(zookeeper.shutdown())
+    Configuration.setConfiguration(null)
+
   }
 }