You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by bo...@apache.org on 2018/09/26 01:08:09 UTC

[1/2] samza git commit: NewSystemConsumer for kafka system

Repository: samza
Updated Branches:
  refs/heads/master 078afb57f -> 003ad1068


http://git-wip-us.apache.org/repos/asf/samza/blob/003ad106/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala
index 51545a0..59a8854 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala
@@ -19,13 +19,10 @@
 
 package org.apache.samza.system.kafka
 
-import org.apache.samza.metrics.MetricsHelper
-import org.apache.samza.metrics.MetricsRegistryMap
-import org.apache.samza.metrics.MetricsRegistry
 import java.util.concurrent.ConcurrentHashMap
+
 import kafka.common.TopicAndPartition
-import org.apache.samza.metrics.Counter
-import org.apache.samza.metrics.Gauge
+import org.apache.samza.metrics._
 
 class KafkaSystemConsumerMetrics(val systemName: String = "unknown", val registry: MetricsRegistry = new MetricsRegistryMap) extends MetricsHelper {
   val offsets = new ConcurrentHashMap[TopicAndPartition, Counter]
@@ -34,67 +31,66 @@ class KafkaSystemConsumerMetrics(val systemName: String = "unknown", val registr
   val lag = new ConcurrentHashMap[TopicAndPartition, Gauge[Long]]
   val highWatermark = new ConcurrentHashMap[TopicAndPartition, Gauge[Long]]
 
-  /*
-   * (String, Int) = (host, port) of BrokerProxy.
-   */
-
-  val reconnects = new ConcurrentHashMap[(String, Int), Counter]
-  val brokerBytesRead = new ConcurrentHashMap[(String, Int), Counter]
-  val brokerReads = new ConcurrentHashMap[(String, Int), Counter]
-  val brokerSkippedFetchRequests = new ConcurrentHashMap[(String, Int), Counter]
-  val topicPartitions = new ConcurrentHashMap[(String, Int), Gauge[Int]]
+  val clientBytesRead = new ConcurrentHashMap[String, Counter]
+  val clientReads = new ConcurrentHashMap[String, Counter]
+  val clientSkippedFetchRequests = new ConcurrentHashMap[String, Counter]
+  val topicPartitions = new ConcurrentHashMap[String, Gauge[Int]]
 
   def registerTopicAndPartition(tp: TopicAndPartition) = {
     if (!offsets.contains(tp)) {
-      offsets.put(tp, newCounter("%s-%s-offset-change" format (tp.topic, tp.partition)))
-      bytesRead.put(tp, newCounter("%s-%s-bytes-read" format (tp.topic, tp.partition)))
-      reads.put(tp, newCounter("%s-%s-messages-read" format (tp.topic, tp.partition)))
-      highWatermark.put(tp, newGauge("%s-%s-high-watermark" format (tp.topic, tp.partition), -1L))
-      lag.put(tp, newGauge("%s-%s-messages-behind-high-watermark" format (tp.topic, tp.partition), 0L))
+      offsets.put(tp, newCounter("%s-%s-offset-change" format(tp.topic, tp.partition)))
+      bytesRead.put(tp, newCounter("%s-%s-bytes-read" format(tp.topic, tp.partition)))
+      reads.put(tp, newCounter("%s-%s-messages-read" format(tp.topic, tp.partition)))
+      highWatermark.put(tp, newGauge("%s-%s-high-watermark" format(tp.topic, tp.partition), -1L))
+      lag.put(tp, newGauge("%s-%s-messages-behind-high-watermark" format(tp.topic, tp.partition), 0L))
     }
   }
 
-  def registerBrokerProxy(host: String, port: Int) {
-    reconnects.put((host, port), newCounter("%s-%s-reconnects" format (host, port)))
-    brokerBytesRead.put((host, port), newCounter("%s-%s-bytes-read" format (host, port)))
-    brokerReads.put((host, port), newCounter("%s-%s-messages-read" format (host, port)))
-    brokerSkippedFetchRequests.put((host, port), newCounter("%s-%s-skipped-fetch-requests" format (host, port)))
-    topicPartitions.put((host, port), newGauge("%s-%s-topic-partitions" format (host, port), 0))
+  def registerClientProxy(clientName: String) {
+    clientBytesRead.put(clientName, newCounter("%s-bytes-read" format clientName))
+    clientReads.put((clientName), newCounter("%s-messages-read" format clientName))
+    clientSkippedFetchRequests.put((clientName), newCounter("%s-skipped-fetch-requests" format clientName))
+    topicPartitions.put(clientName, newGauge("%s-registered-topic-partitions" format clientName, 0))
   }
 
   // java friendlier interfaces
   // Gauges
-  def setTopicPartitionValue(host: String, port: Int, value: Int) {
-    topicPartitions.get((host,port)).set(value)
+  def setNumTopicPartitions(clientName: String, value: Int) {
+    topicPartitions.get(clientName).set(value)
   }
+
   def setLagValue(topicAndPartition: TopicAndPartition, value: Long) {
     lag.get((topicAndPartition)).set(value);
   }
+
   def setHighWatermarkValue(topicAndPartition: TopicAndPartition, value: Long) {
     highWatermark.get((topicAndPartition)).set(value);
   }
 
   // Counters
-  def incBrokerReads(host: String, port: Int) {
-    brokerReads.get((host,port)).inc
+  def incClientReads(clientName: String) {
+    clientReads.get(clientName).inc
   }
+
   def incReads(topicAndPartition: TopicAndPartition) {
     reads.get(topicAndPartition).inc;
   }
+
   def incBytesReads(topicAndPartition: TopicAndPartition, inc: Long) {
     bytesRead.get(topicAndPartition).inc(inc);
   }
-  def incBrokerBytesReads(host: String, port: Int, incBytes: Long) {
-    brokerBytesRead.get((host,port)).inc(incBytes)
+
+  def incClientBytesReads(clientName: String, incBytes: Long) {
+    clientBytesRead.get(clientName).inc(incBytes)
   }
-  def incBrokerSkippedFetchRequests(host: String, port: Int) {
-    brokerSkippedFetchRequests.get((host,port)).inc()
+
+  def incClientSkippedFetchRequests(clientName: String) {
+    clientSkippedFetchRequests.get(clientName).inc()
   }
+
   def setOffsets(topicAndPartition: TopicAndPartition, offset: Long) {
     offsets.get(topicAndPartition).set(offset)
   }
-  def incReconnects(host: String, port: Int) {
-    reconnects.get((host,port)).inc()
-  }
+
   override def getPrefix = systemName + "-"
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/003ad106/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
index 9f0b5f2..ba5390b 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
@@ -20,21 +20,19 @@
 package org.apache.samza.system.kafka
 
 import java.util.Properties
+
 import kafka.utils.ZkUtils
+import org.apache.kafka.clients.producer.KafkaProducer
 import org.apache.samza.SamzaException
 import org.apache.samza.config.ApplicationConfig.ApplicationMode
-import org.apache.samza.util.{Logging, KafkaUtil, ExponentialSleepStrategy, ClientUtilTopicMetadataStore}
-import org.apache.samza.config.{KafkaConfig, ApplicationConfig, StreamConfig, Config}
-import org.apache.samza.metrics.MetricsRegistry
 import org.apache.samza.config.KafkaConfig.Config2Kafka
-import org.apache.samza.config.TaskConfig.Config2Task
-import org.apache.kafka.clients.producer.KafkaProducer
-import org.apache.samza.system.SystemFactory
 import org.apache.samza.config.StorageConfig._
-import org.apache.samza.system.SystemProducer
-import org.apache.samza.system.SystemAdmin
 import org.apache.samza.config.SystemConfig.Config2System
-import org.apache.samza.system.SystemConsumer
+import org.apache.samza.config.TaskConfig.Config2Task
+import org.apache.samza.config._
+import org.apache.samza.metrics.MetricsRegistry
+import org.apache.samza.system.{SystemAdmin, SystemConsumer, SystemFactory, SystemProducer}
+import org.apache.samza.util._
 
 object KafkaSystemFactory extends Logging {
   def getInjectedProducerProperties(systemName: String, config: Config) = if (config.isChangelogSystem(systemName)) {
@@ -46,49 +44,27 @@ object KafkaSystemFactory extends Logging {
 }
 
 class KafkaSystemFactory extends SystemFactory with Logging {
+
   def getConsumer(systemName: String, config: Config, registry: MetricsRegistry): SystemConsumer = {
-    val clientId = KafkaUtil.getClientId("samza-consumer", config)
+    val clientId = KafkaConsumerConfig.getConsumerClientId( config)
     val metrics = new KafkaSystemConsumerMetrics(systemName, registry)
 
-    // Kind of goofy to need a producer config for consumers, but we need metadata.
-    val producerConfig = config.getKafkaSystemProducerConfig(systemName, clientId)
-    val bootstrapServers = producerConfig.bootsrapServers
-    val consumerConfig = config.getKafkaSystemConsumerConfig(systemName, clientId)
+    val kafkaConsumer = KafkaSystemConsumer.getKafkaConsumerImpl(systemName, clientId, config)
+    info("Created kafka consumer for system %s, clientId %s: %s" format (systemName, clientId, kafkaConsumer))
 
-    val timeout = consumerConfig.socketTimeoutMs
-    val bufferSize = consumerConfig.socketReceiveBufferBytes
-    val fetchSize = new StreamFetchSizes(consumerConfig.fetchMessageMaxBytes, config.getFetchMessageMaxBytesTopics(systemName))
-    val consumerMinSize = consumerConfig.fetchMinBytes
-    val consumerMaxWait = consumerConfig.fetchWaitMaxMs
-    val autoOffsetResetDefault = consumerConfig.autoOffsetReset
-    val autoOffsetResetTopics = config.getAutoOffsetResetTopics(systemName)
-    val fetchThreshold = config.getConsumerFetchThreshold(systemName).getOrElse("50000").toInt
-    val fetchThresholdBytes = config.getConsumerFetchThresholdBytes(systemName).getOrElse("-1").toLong
-    val offsetGetter = new GetOffset(autoOffsetResetDefault, autoOffsetResetTopics)
-    val metadataStore = new ClientUtilTopicMetadataStore(bootstrapServers, clientId, timeout)
-
-    new KafkaSystemConsumer(
-      systemName = systemName,
-      systemAdmin = getAdmin(systemName, config),
-      metrics = metrics,
-      metadataStore = metadataStore,
-      clientId = clientId,
-      timeout = timeout,
-      bufferSize = bufferSize,
-      fetchSize = fetchSize,
-      consumerMinSize = consumerMinSize,
-      consumerMaxWait = consumerMaxWait,
-      fetchThreshold = fetchThreshold,
-      fetchThresholdBytes = fetchThresholdBytes,
-      fetchLimitByBytesEnabled = config.isConsumerFetchThresholdBytesEnabled(systemName),
-      offsetGetter = offsetGetter)
+    val kafkaSystemConsumer = new KafkaSystemConsumer(kafkaConsumer, systemName, config, clientId, metrics, new SystemClock)
+    info("Created samza system consumer %s" format  (kafkaSystemConsumer.toString))
+
+    kafkaSystemConsumer
   }
 
   def getProducer(systemName: String, config: Config, registry: MetricsRegistry): SystemProducer = {
-    val clientId = KafkaUtil.getClientId("samza-producer", config)
+    val clientId = KafkaConsumerConfig.getProducerClientId(config)
     val injectedProps = KafkaSystemFactory.getInjectedProducerProperties(systemName, config)
     val producerConfig = config.getKafkaSystemProducerConfig(systemName, clientId, injectedProps)
-    val getProducer = () => { new KafkaProducer[Array[Byte], Array[Byte]](producerConfig.getProducerProperties) }
+    val getProducer = () => {
+      new KafkaProducer[Array[Byte], Array[Byte]](producerConfig.getProducerProperties)
+    }
     val metrics = new KafkaSystemProducerMetrics(systemName, registry)
 
     // Unlike consumer, no need to use encoders here, since they come for free
@@ -104,7 +80,7 @@ class KafkaSystemFactory extends SystemFactory with Logging {
   }
 
   def getAdmin(systemName: String, config: Config): SystemAdmin = {
-    val clientId = KafkaUtil.getClientId("samza-admin", config)
+    val clientId = KafkaConsumerConfig.getAdminClientId(config)
     val producerConfig = config.getKafkaSystemProducerConfig(systemName, clientId)
     val bootstrapServers = producerConfig.bootsrapServers
     val consumerConfig = config.getKafkaSystemConsumerConfig(systemName, clientId)
@@ -119,13 +95,13 @@ class KafkaSystemFactory extends SystemFactory with Logging {
     val coordinatorStreamReplicationFactor = config.getCoordinatorReplicationFactor.toInt
     val storeToChangelog = config.getKafkaChangelogEnabledStores()
     // Construct the meta information for each topic, if the replication factor is not defined, we use 2 as the number of replicas for the change log stream.
-    val topicMetaInformation = storeToChangelog.map{case (storeName, topicName) =>
-    {
-       val replicationFactor = config.getChangelogStreamReplicationFactor(storeName).toInt
-       val changelogInfo = ChangelogInfo(replicationFactor, config.getChangelogKafkaProperties(storeName))
-       info("Creating topic meta information for topic: %s with replication factor: %s" format (topicName, replicationFactor))
-       (topicName, changelogInfo)
-    }}
+    val topicMetaInformation = storeToChangelog.map { case (storeName, topicName) => {
+      val replicationFactor = config.getChangelogStreamReplicationFactor(storeName).toInt
+      val changelogInfo = ChangelogInfo(replicationFactor, config.getChangelogKafkaProperties(storeName))
+      info("Creating topic meta information for topic: %s with replication factor: %s" format(topicName, replicationFactor))
+      (topicName, changelogInfo)
+    }
+    }
 
     val deleteCommittedMessages = config.deleteCommittedMessages(systemName).exists(isEnabled => isEnabled.toBoolean)
     val intermediateStreamProperties: Map[String, Properties] = getIntermediateStreamProperties(config)
@@ -150,7 +126,7 @@ class KafkaSystemFactory extends SystemFactory with Logging {
       "segment.bytes" -> segmentBytes)) { case (props, (k, v)) => props.put(k, v); props }
   }
 
-  def getIntermediateStreamProperties(config : Config): Map[String, Properties] = {
+  def getIntermediateStreamProperties(config: Config): Map[String, Properties] = {
     val appConfig = new ApplicationConfig(config)
     if (appConfig.getAppMode == ApplicationMode.BATCH) {
       val streamConfig = new StreamConfig(config)

http://git-wip-us.apache.org/repos/asf/samza/blob/003ad106/samza-kafka/src/test/java/org/apache/samza/config/TestKafkaConsumerConfig.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/java/org/apache/samza/config/TestKafkaConsumerConfig.java b/samza-kafka/src/test/java/org/apache/samza/config/TestKafkaConsumerConfig.java
new file mode 100644
index 0000000..de5d093
--- /dev/null
+++ b/samza-kafka/src/test/java/org/apache/samza/config/TestKafkaConsumerConfig.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.samza.config;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.RangeAssignor;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.samza.SamzaException;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class TestKafkaConsumerConfig {
+
+  public final static String SYSTEM_NAME = "testSystem";
+  public final static String KAFKA_PRODUCER_PROPERTY_PREFIX = "systems." + SYSTEM_NAME + ".producer.";
+  public final static String KAFKA_CONSUMER_PROPERTY_PREFIX = "systems." + SYSTEM_NAME + ".consumer.";
+  private final static String CLIENT_ID = "clientId";
+
+  @Test
+  public void testDefaults() {
+    Map<String, String> props = new HashMap<>();
+
+    props.put(KAFKA_CONSUMER_PROPERTY_PREFIX + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); // should be ignored
+    props.put(KAFKA_CONSUMER_PROPERTY_PREFIX + ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
+        "Ignore"); // should be ignored
+    props.put(KAFKA_CONSUMER_PROPERTY_PREFIX + ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,
+        "100"); // should NOT be ignored
+
+    props.put(JobConfig.JOB_NAME(), "jobName");
+
+    // if KAFKA_CONSUMER_PROPERTY_PREFIX is set, then PRODUCER should be ignored
+    props.put(KAFKA_PRODUCER_PROPERTY_PREFIX + "bootstrap.servers", "ignroeThis:9092");
+    props.put(KAFKA_CONSUMER_PROPERTY_PREFIX + "bootstrap.servers", "useThis:9092");
+
+    Config config = new MapConfig(props);
+    KafkaConsumerConfig kafkaConsumerConfig =
+        KafkaConsumerConfig.getKafkaSystemConsumerConfig(config, SYSTEM_NAME, CLIENT_ID);
+
+    Assert.assertEquals("false", kafkaConsumerConfig.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG));
+
+    Assert.assertEquals(KafkaConsumerConfig.DEFAULT_KAFKA_CONSUMER_MAX_POLL_RECORDS,
+        kafkaConsumerConfig.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG));
+
+    Assert.assertEquals(RangeAssignor.class.getName(),
+        kafkaConsumerConfig.get(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG));
+
+    Assert.assertEquals("useThis:9092", kafkaConsumerConfig.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
+    Assert.assertEquals("100", kafkaConsumerConfig.get(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG));
+
+    Assert.assertEquals(ByteArrayDeserializer.class.getName(),
+        kafkaConsumerConfig.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG));
+
+    Assert.assertEquals(ByteArrayDeserializer.class.getName(),
+        kafkaConsumerConfig.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG));
+
+    Assert.assertEquals(CLIENT_ID, kafkaConsumerConfig.get(ConsumerConfig.CLIENT_ID_CONFIG));
+
+    Assert.assertEquals(KafkaConsumerConfig.getConsumerGroupId(config),
+        kafkaConsumerConfig.get(ConsumerConfig.GROUP_ID_CONFIG));
+
+    Assert.assertEquals(KafkaConsumerConfig.CONSUMER_CLIENT_ID_PREFIX.replace("-", "_") + "-jobName-1",
+        KafkaConsumerConfig.getConsumerClientId(config));
+    Assert.assertEquals("jobName-1", KafkaConsumerConfig.getConsumerGroupId(config));
+
+    props.put(JobConfig.JOB_ID(), "jobId");
+    config = new MapConfig(props);
+
+    Assert.assertEquals(KafkaConsumerConfig.CONSUMER_CLIENT_ID_PREFIX.replace("-", "_") + "-jobName-jobId",
+        KafkaConsumerConfig.getConsumerClientId(config));
+    Assert.assertEquals("jobName-jobId", KafkaConsumerConfig.getConsumerGroupId(config));
+  }
+
+  // test stuff that should not be overridden
+  @Test
+  public void testNotOverride() {
+    Map<String, String> props = new HashMap<>();
+
+    // if KAFKA_CONSUMER_PROPERTY_PREFIX is not set, then PRODUCER should be used
+    props.put(KAFKA_PRODUCER_PROPERTY_PREFIX + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "useThis:9092");
+    props.put(KAFKA_CONSUMER_PROPERTY_PREFIX + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+        TestKafkaConsumerConfig.class.getName());
+    props.put(KAFKA_CONSUMER_PROPERTY_PREFIX + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+        TestKafkaConsumerConfig.class.getName());
+
+    props.put(JobConfig.JOB_NAME(), "jobName");
+
+    Config config = new MapConfig(props);
+    KafkaConsumerConfig kafkaConsumerConfig =
+        KafkaConsumerConfig.getKafkaSystemConsumerConfig(config, SYSTEM_NAME, CLIENT_ID);
+
+    Assert.assertEquals("useThis:9092", kafkaConsumerConfig.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
+
+    Assert.assertEquals(TestKafkaConsumerConfig.class.getName(),
+        kafkaConsumerConfig.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG));
+
+    Assert.assertEquals(TestKafkaConsumerConfig.class.getName(),
+        kafkaConsumerConfig.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG));
+  }
+
+  @Test
+  public void testGetConsumerClientId() {
+    Map<String, String> map = new HashMap<>();
+
+    map.put(JobConfig.JOB_NAME(), "jobName");
+    map.put(JobConfig.JOB_ID(), "jobId");
+    String result = KafkaConsumerConfig.getConsumerClientId("consumer", new MapConfig(map));
+    Assert.assertEquals("consumer-jobName-jobId", result);
+
+    result = KafkaConsumerConfig.getConsumerClientId("consumer-", new MapConfig(map));
+    Assert.assertEquals("consumer_-jobName-jobId", result);
+
+    result = KafkaConsumerConfig.getConsumerClientId("super-duper-consumer", new MapConfig(map));
+    Assert.assertEquals("super_duper_consumer-jobName-jobId", result);
+
+    map.put(JobConfig.JOB_NAME(), " very important!job");
+    result = KafkaConsumerConfig.getConsumerClientId("consumer", new MapConfig(map));
+    Assert.assertEquals("consumer-_very_important_job-jobId", result);
+
+    map.put(JobConfig.JOB_ID(), "number-#3");
+    result = KafkaConsumerConfig.getConsumerClientId("consumer", new MapConfig(map));
+    Assert.assertEquals("consumer-_very_important_job-number__3", result);
+  }
+
+  @Test(expected = SamzaException.class)
+  public void testNoBootstrapServers() {
+    KafkaConsumerConfig kafkaConsumerConfig =
+        KafkaConsumerConfig.getKafkaSystemConsumerConfig(new MapConfig(Collections.emptyMap()), SYSTEM_NAME,
+            "clientId");
+
+    Assert.fail("didn't get exception for the missing config:" + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/003ad106/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
index 77f47f9..7e968bf 100644
--- a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
+++ b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
@@ -19,17 +19,14 @@
 
 package org.apache.samza.system.kafka;
 
-import java.util.*;
 import java.util.HashMap;
 import java.util.Map;
-
+import java.util.Properties;
 import kafka.api.TopicMetadata;
 import org.apache.samza.system.StreamSpec;
 import org.apache.samza.system.StreamValidationException;
 import org.apache.samza.system.SystemAdmin;
 import org.apache.samza.util.ScalaJavaUtil;
-import org.junit.After;
-import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
 
@@ -71,7 +68,6 @@ public class TestKafkaSystemAdminJava extends TestKafkaSystemAdmin {
 
     admin.createStream(spec);
     admin.validateStream(spec);
-
   }
 
   @Test
@@ -143,7 +139,8 @@ public class TestKafkaSystemAdminJava extends TestKafkaSystemAdmin {
   public void testCreateStream() {
     StreamSpec spec = new StreamSpec("testId", "testStream", "testSystem", 8);
 
-    assertTrue("createStream should return true if the stream does not exist and then is created.", systemAdmin().createStream(spec));
+    assertTrue("createStream should return true if the stream does not exist and then is created.",
+        systemAdmin().createStream(spec));
     systemAdmin().validateStream(spec);
 
     assertFalse("createStream should return false if the stream already exists.", systemAdmin().createStream(spec));
@@ -162,7 +159,8 @@ public class TestKafkaSystemAdminJava extends TestKafkaSystemAdmin {
     StreamSpec spec1 = new StreamSpec("testId", "testStreamPartition", "testSystem", 8);
     StreamSpec spec2 = new StreamSpec("testId", "testStreamPartition", "testSystem", 4);
 
-    assertTrue("createStream should return true if the stream does not exist and then is created.", systemAdmin().createStream(spec1));
+    assertTrue("createStream should return true if the stream does not exist and then is created.",
+        systemAdmin().createStream(spec1));
 
     systemAdmin().validateStream(spec2);
   }
@@ -172,7 +170,8 @@ public class TestKafkaSystemAdminJava extends TestKafkaSystemAdmin {
     StreamSpec spec1 = new StreamSpec("testId", "testStreamName1", "testSystem", 8);
     StreamSpec spec2 = new StreamSpec("testId", "testStreamName2", "testSystem", 8);
 
-    assertTrue("createStream should return true if the stream does not exist and then is created.", systemAdmin().createStream(spec1));
+    assertTrue("createStream should return true if the stream does not exist and then is created.",
+        systemAdmin().createStream(spec1));
 
     systemAdmin().validateStream(spec2);
   }
@@ -181,7 +180,8 @@ public class TestKafkaSystemAdminJava extends TestKafkaSystemAdmin {
   public void testClearStream() {
     StreamSpec spec = new StreamSpec("testId", "testStreamClear", "testSystem", 8);
 
-    assertTrue("createStream should return true if the stream does not exist and then is created.", systemAdmin().createStream(spec));
+    assertTrue("createStream should return true if the stream does not exist and then is created.",
+        systemAdmin().createStream(spec));
     assertTrue(systemAdmin().clearStream(spec));
 
     scala.collection.immutable.Set<String> topic = new scala.collection.immutable.Set.Set1<>(spec.getPhysicalName());

http://git-wip-us.apache.org/repos/asf/samza/blob/003ad106/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
deleted file mode 100644
index d510076..0000000
--- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
+++ /dev/null
@@ -1,434 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.samza.system.kafka
-
-import java.nio.ByteBuffer
-import java.util.concurrent.CountDownLatch
-
-import kafka.api.{PartitionOffsetsResponse, _}
-import kafka.common.TopicAndPartition
-import kafka.consumer.SimpleConsumer
-import kafka.message.{ByteBufferMessageSet, Message, MessageAndOffset, MessageSet}
-import org.apache.kafka.common.protocol.Errors
-import org.apache.samza.SamzaException
-import org.apache.samza.util.Logging
-import org.junit.Assert._
-import org.junit._
-import org.mockito.Matchers._
-import org.mockito.Mockito._
-import org.mockito.invocation.InvocationOnMock
-import org.mockito.stubbing.Answer
-import org.mockito.{Matchers, Mockito}
-
-import scala.collection.JavaConverters._
-
-class TestBrokerProxy extends Logging {
-  val tp2 = new TopicAndPartition("Redbird", 2013)
-  var fetchTp1 = true // control whether fetching tp1 messages or not
-
-  @Test def brokerProxyRetrievesMessagesCorrectly() = {
-    val (bp, tp, sink) = getMockBrokerProxy()
-
-    bp.start
-    bp.addTopicPartition(tp, Option("0"))
-    // Add tp2, which should never receive messages since sink disables it.
-    bp.addTopicPartition(tp2, Option("0"))
-    Thread.sleep(1000)
-    assertEquals(2, sink.receivedMessages.size)
-    assertEquals(42, sink.receivedMessages(0)._2.offset)
-    assertEquals(84, sink.receivedMessages(1)._2.offset)
-  }
-
-  @Test def brokerProxySkipsFetchForEmptyRequests() = {
-    val (bp, tp, sink) = getMockBrokerProxy()
-
-    bp.start
-    // Only add tp2, which should never receive messages since sink disables it.
-    bp.addTopicPartition(tp2, Option("0"))
-    Thread.sleep(1000)
-    assertEquals(0, sink.receivedMessages.size)
-    assertTrue(bp.metrics.brokerSkippedFetchRequests.get((bp.host, bp.port)).getCount > 0)
-    assertEquals(0, bp.metrics.brokerReads.get((bp.host, bp.port)).getCount)
-  }
-
-  @Test def brokerProxyThrowsExceptionOnDuplicateTopicPartitions() = {
-    val (bp, tp, _) = getMockBrokerProxy()
-    bp.start
-    bp.addTopicPartition(tp, Option("0"))
-
-    try {
-      bp.addTopicPartition(tp, Option("1"))
-      fail("Should have thrown an exception")
-    } catch {
-      case se: SamzaException => assertEquals(se.getMessage, "Already consuming TopicPartition [Redbird,2012]")
-      case other: Exception => fail("Got some other exception than what we were expecting: " + other)
-    }
-  }
-
-  def getMockBrokerProxy() = {
-    val sink = new MessageSink {
-      val receivedMessages = new scala.collection.mutable.ListBuffer[(TopicAndPartition, MessageAndOffset, Boolean)]()
-
-      def abdicate(tp: TopicAndPartition, nextOffset: Long) {}
-
-      def refreshDropped() {}
-
-      def addMessage(tp: TopicAndPartition, msg: MessageAndOffset, highWatermark: Long) {
-        receivedMessages += ((tp, msg, msg.offset.equals(highWatermark)))
-      }
-
-      def setIsAtHighWatermark(tp: TopicAndPartition, isAtHighWatermark: Boolean) {
-      }
-
-      // Never need messages for tp2.
-      def needsMoreMessages(tp: TopicAndPartition): Boolean = !tp.equals(tp2) && fetchTp1
-    }
-
-    val system = "daSystem"
-    val host = "host"
-    val port = 2222
-    val tp = new TopicAndPartition("Redbird", 2012)
-    val metrics = new KafkaSystemConsumerMetrics(system)
-
-    metrics.registerBrokerProxy(host, port)
-    metrics.registerTopicAndPartition(tp)
-    metrics.topicPartitions.get((host, port)).set(1)
-
-    val bp = new BrokerProxy(
-      host,
-      port,
-      system,
-      "daClientId",
-      metrics,
-      sink,
-      offsetGetter = new GetOffset("fail", Map("Redbird" -> "largest"))) {
-
-      override val sleepMSWhileNoTopicPartitions = 100
-      // Speed up for test
-      var alreadyCreatedConsumer = false
-
-      // Scala traits and Mockito mocks don't mix, unfortunately.
-      override def createSimpleConsumer() = {
-        if (alreadyCreatedConsumer) {
-          System.err.println("Should only be creating one consumer in this test!")
-          throw new InterruptedException("Should only be creating one consumer in this test!")
-        }
-        alreadyCreatedConsumer = true
-
-        new DefaultFetchSimpleConsumer("a", 1, 2, 3, "b", new StreamFetchSizes(42)) {
-          val sc = Mockito.mock(classOf[SimpleConsumer])
-          val mockOffsetResponse = {
-            val offsetResponse = Mockito.mock(classOf[OffsetResponse])
-            val partitionOffsetResponse = {
-              val por = Mockito.mock(classOf[PartitionOffsetsResponse])
-              when(por.offsets).thenReturn(List(1l).toSeq)
-              por
-            }
-
-            val map = scala.Predef.Map[TopicAndPartition, PartitionOffsetsResponse](tp -> partitionOffsetResponse, tp2 -> partitionOffsetResponse)
-            when(offsetResponse.partitionErrorAndOffsets).thenReturn(map)
-            offsetResponse
-          }
-
-          when(sc.getOffsetsBefore(any(classOf[OffsetRequest]))).thenReturn(mockOffsetResponse)
-
-          val fetchResponse = {
-            val fetchResponse = Mockito.mock(classOf[FetchResponse])
-
-            val messageSet = {
-              val messageSet = Mockito.mock(classOf[ByteBufferMessageSet])
-
-              def getMessage() = new Message(Mockito.mock(classOf[ByteBuffer]))
-              val messages = List(new MessageAndOffset(getMessage, 42), new MessageAndOffset(getMessage, 84))
-
-              when(messageSet.sizeInBytes).thenReturn(43)
-              when(messageSet.size).thenReturn(44)
-              when(messageSet.iterator).thenReturn(messages.iterator)
-              when(messageSet.head).thenReturn(messages.head)
-              messageSet
-            }
-
-            val fetchResponsePartitionData = FetchResponsePartitionData(Errors.NONE, 500, messageSet)
-            val map = scala.Predef.Map[TopicAndPartition, FetchResponsePartitionData](tp -> fetchResponsePartitionData)
-
-            when(fetchResponse.data).thenReturn(map.toSeq)
-            when(fetchResponse.messageSet(any(classOf[String]), any(classOf[Int]))).thenReturn(messageSet)
-            fetchResponse
-          }
-          when(sc.fetch(any(classOf[FetchRequest]))).thenReturn(fetchResponse)
-
-          override def close() = sc.close()
-
-          override def send(request: TopicMetadataRequest): TopicMetadataResponse = sc.send(request)
-
-          override def fetch(request: FetchRequest): FetchResponse = {
-            // Verify that we only get fetch requests for one tp, even though
-            // two were registered. This is to verify that
-            // sink.needsMoreMessages works.
-            assertEquals(1, request.requestInfo.size)
-            sc.fetch(request)
-          }
-
-          when(sc.earliestOrLatestOffset(any(classOf[TopicAndPartition]), any(classOf[Long]), any(classOf[Int]))).thenReturn(100)
-
-          override def getOffsetsBefore(request: OffsetRequest): OffsetResponse = sc.getOffsetsBefore(request)
-
-          override def commitOffsets(request: OffsetCommitRequest): OffsetCommitResponse = sc.commitOffsets(request)
-
-          override def fetchOffsets(request: OffsetFetchRequest): OffsetFetchResponse = sc.fetchOffsets(request)
-
-          override def earliestOrLatestOffset(topicAndPartition: TopicAndPartition, earliestOrLatest: Long, consumerId: Int): Long = sc.earliestOrLatestOffset(topicAndPartition, earliestOrLatest, consumerId)
-        }
-      }
-
-    }
-
-    (bp, tp, sink)
-  }
-
-  @Test def brokerProxyUpdateLatencyMetrics() = {
-    val (bp, tp, _) = getMockBrokerProxy()
-
-    bp.start
-    bp.addTopicPartition(tp, Option("0"))
-    Thread.sleep(1000)
-    // update when fetching messages
-    assertEquals(500, bp.metrics.highWatermark.get(tp).getValue)
-    assertEquals(415, bp.metrics.lag.get(tp).getValue)
-
-    fetchTp1 = false
-    Thread.sleep(1000)
-    // update when not fetching messages
-    assertEquals(100, bp.metrics.highWatermark.get(tp).getValue)
-    assertEquals(15, bp.metrics.lag.get(tp).getValue)
-
-    fetchTp1 = true
-  }
-
- @Test def brokerProxyCorrectlyHandlesOffsetOutOfRange(): Unit = {
-    // Need to wait for the thread to do some work before ending the test
-    val countdownLatch = new CountDownLatch(1)
-    var failString: String = null
-
-    val mockMessageSink = mock(classOf[MessageSink])
-    when(mockMessageSink.needsMoreMessages(any())).thenReturn(true)
-
-    val doNothingMetrics = new KafkaSystemConsumerMetrics()
-
-    val tp = new TopicAndPartition("topic", 42)
-
-    val mockOffsetGetter = mock(classOf[GetOffset])
-    // This will be used by the simple consumer below, and this is the response that simple consumer needs
-    when(mockOffsetGetter.isValidOffset(any(classOf[DefaultFetchSimpleConsumer]), Matchers.eq(tp), Matchers.eq("0"))).thenReturn(true)
-    when(mockOffsetGetter.getResetOffset(any(classOf[DefaultFetchSimpleConsumer]), Matchers.eq(tp))).thenReturn(1492l)
-
-    var callsToCreateSimpleConsumer = 0
-    val mockSimpleConsumer = mock(classOf[DefaultFetchSimpleConsumer])
-
-    // Create an answer that first indicates offset out of range on first invocation and on second
-    // verifies that the parameters have been updated to what we expect them to be
-    val answer = new Answer[FetchResponse]() {
-      var invocationCount = 0
-
-      def answer(invocation: InvocationOnMock): FetchResponse = {
-        val arguments = invocation.getArguments()(0).asInstanceOf[List[Object]](0).asInstanceOf[(String, Long)]
-
-        if (invocationCount == 0) {
-          if (arguments !=(tp, 0)) {
-            failString = "First invocation did not have the right arguments: " + arguments
-            countdownLatch.countDown()
-          }
-          val mfr = mock(classOf[FetchResponse])
-          when(mfr.hasError).thenReturn(true)
-          when(mfr.error("topic", 42)).thenReturn(Errors.OFFSET_OUT_OF_RANGE)
-
-          val messageSet = mock(classOf[MessageSet])
-          when(messageSet.iterator).thenReturn(Iterator.empty)
-          val response = mock(classOf[FetchResponsePartitionData])
-          when(response.error).thenReturn(Errors.OFFSET_OUT_OF_RANGE)
-          val responseMap = Map(tp -> response)
-          when(mfr.data).thenReturn(responseMap.toSeq)
-          invocationCount += 1
-          mfr
-        } else {
-          if (arguments !=(tp, 1492)) {
-            failString = "On second invocation, arguments were not correct: " + arguments
-          }
-          countdownLatch.countDown()
-          Thread.currentThread().interrupt()
-          null
-        }
-      }
-    }
-
-    when(mockSimpleConsumer.defaultFetch(any())).thenAnswer(answer)
-
-    // So now we have a fetch response that will fail.  Prime the mockGetOffset to send us to a new offset
-
-    val bp = new BrokerProxy("host", 423, "system", "clientID", doNothingMetrics, mockMessageSink, Int.MaxValue, 1024000, new StreamFetchSizes(256 * 1024), 524288, 1000, mockOffsetGetter) {
-
-      override def createSimpleConsumer() = {
-        if (callsToCreateSimpleConsumer > 1) {
-          failString = "Tried to create more than one simple consumer"
-          countdownLatch.countDown()
-        }
-        callsToCreateSimpleConsumer += 1
-        mockSimpleConsumer
-      }
-    }
-
-    bp.addTopicPartition(tp, Option("0"))
-    bp.start
-    countdownLatch.await()
-    bp.stop
-    if (failString != null) {
-      fail(failString)
-    }
-  }
-
-  /**
-   * Test that makes sure that BrokerProxy abdicates all TopicAndPartitions
-   * that it owns when a consumer failure occurs.
-   */
-  @Test def brokerProxyAbdicatesOnConnectionFailure(): Unit = {
-    val countdownLatch = new CountDownLatch(1)
-    var abdicated: Option[TopicAndPartition] = None
-    @volatile var refreshDroppedCount = 0
-    val mockMessageSink = new MessageSink {
-      override def setIsAtHighWatermark(tp: TopicAndPartition, isAtHighWatermark: Boolean) {
-      }
-
-      override def addMessage(tp: TopicAndPartition, msg: MessageAndOffset, highWatermark: Long) {
-      }
-
-      override def abdicate(tp: TopicAndPartition, nextOffset: Long) {
-        abdicated = Some(tp)
-        countdownLatch.countDown
-      }
-
-      override def refreshDropped() {
-        refreshDroppedCount += 1
-      }
-
-      override def needsMoreMessages(tp: TopicAndPartition): Boolean = {
-        true
-      }
-    }
-
-    val doNothingMetrics = new KafkaSystemConsumerMetrics()
-    val tp = new TopicAndPartition("topic", 42)
-    val mockOffsetGetter = mock(classOf[GetOffset])
-    val mockSimpleConsumer = mock(classOf[DefaultFetchSimpleConsumer])
-
-    when(mockOffsetGetter.isValidOffset(any(classOf[DefaultFetchSimpleConsumer]), Matchers.eq(tp), Matchers.eq("0"))).thenReturn(true)
-    when(mockOffsetGetter.getResetOffset(any(classOf[DefaultFetchSimpleConsumer]), Matchers.eq(tp))).thenReturn(1492l)
-    when(mockSimpleConsumer.defaultFetch(any())).thenThrow(new SamzaException("Pretend this is a ClosedChannelException. Can't use ClosedChannelException because it's checked, and Mockito doesn't like that."))
-
-    val bp = new BrokerProxy("host", 567, "system", "clientID", doNothingMetrics, mockMessageSink, Int.MaxValue, 1024000, new StreamFetchSizes(256 * 1024), 524288, 1000, mockOffsetGetter) {
-      override def createSimpleConsumer() = {
-        mockSimpleConsumer
-      }
-    }
-
-    val waitForRefresh = () => {
-      val currentRefreshDroppedCount = refreshDroppedCount
-      while (refreshDroppedCount == currentRefreshDroppedCount) {
-        Thread.sleep(100)
-      }
-    }
-
-    bp.addTopicPartition(tp, Option("0"))
-    bp.start
-    // BP should refresh on startup.
-    waitForRefresh()
-    countdownLatch.await()
-    // BP should continue refreshing after it's abdicated all TopicAndPartitions.
-    waitForRefresh()
-    bp.stop
-    assertEquals(tp, abdicated.getOrElse(null))
-  }
-
-  @Test def brokerProxyAbdicatesHardErrors(): Unit = {
-    val doNothingMetrics = new KafkaSystemConsumerMetrics
-    val mockMessageSink = new MessageSink {
-      override def needsMoreMessages(tp: TopicAndPartition): Boolean = true
-      override def abdicate(tp: TopicAndPartition, nextOffset: Long) {}
-      override def addMessage(tp: TopicAndPartition, msg: MessageAndOffset, highWatermark: Long) {}
-      override def refreshDropped() {throw new OutOfMemoryError("Test - OOME")}
-      override def setIsAtHighWatermark(tp: TopicAndPartition, isAtHighWatermark: Boolean): Unit = {}
-    }
-    val mockOffsetGetter = mock(classOf[GetOffset])
-    val mockSimpleConsumer = mock(classOf[DefaultFetchSimpleConsumer])
-
-    val bp = new BrokerProxy("host", 658, "system", "clientID", doNothingMetrics, mockMessageSink, Int.MaxValue, 1024000, new StreamFetchSizes(256 * 1024), 524288, 1000, mockOffsetGetter) {
-      override def createSimpleConsumer() = {
-        mockSimpleConsumer
-      }
-    }
-    var caughtError = false
-    try {
-      bp.thread.run
-    } catch {
-      case e: SamzaException => {
-        assertEquals(e.getMessage, "Got out of memory error in broker proxy thread.")
-        info("Received OutOfMemoryError in broker proxy.")
-        caughtError = true
-      }
-    }
-    assertEquals(true, caughtError)
-    val mockMessageSink2 = new MessageSink {
-      override def needsMoreMessages(tp: TopicAndPartition): Boolean = true
-      override def abdicate(tp: TopicAndPartition, nextOffset: Long): Unit = {}
-      override def addMessage(tp: TopicAndPartition, msg: MessageAndOffset, highWatermark: Long): Unit = {}
-      override def refreshDropped(): Unit = {throw new StackOverflowError("Test - SOE")}
-      override def setIsAtHighWatermark(tp: TopicAndPartition, isAtHighWatermark: Boolean): Unit = {}
-    }
-    caughtError = false
-    val bp2 = new BrokerProxy("host", 689, "system", "clientID2", doNothingMetrics, mockMessageSink2, Int.MaxValue, 1024000, new StreamFetchSizes(256 * 1024), 524288, 1000, mockOffsetGetter) {
-      override def createSimpleConsumer() = {
-        mockSimpleConsumer
-      }
-    }
-    try {
-      bp2.thread.run
-    } catch {
-      case e: SamzaException => {
-        assertEquals(e.getMessage, "Got stack overflow error in broker proxy thread.")
-        info("Received StackOverflowError in broker proxy.")
-        caughtError = true
-      }
-    }
-    assertEquals(true, caughtError)
-  }
-
-  @Test
-	def brokerProxyStopCloseConsumer: Unit = {
-    val mockSimpleConsumer = mock(classOf[DefaultFetchSimpleConsumer])
-    val bp = new BrokerProxy("host", 0, "system", "clientID", new KafkaSystemConsumerMetrics(), null){
-      override def createSimpleConsumer() = {
-        mockSimpleConsumer
-      }
-    }
-    bp.start
-    bp.stop
-    verify(mockSimpleConsumer).close
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/003ad106/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java
new file mode 100644
index 0000000..5791545
--- /dev/null
+++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java
@@ -0,0 +1,220 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.samza.system.kafka;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.samza.Partition;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.KafkaConfig;
+import org.apache.samza.config.KafkaConsumerConfig;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Clock;
+import org.apache.samza.util.NoOpMetricsRegistry;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+
+public class TestKafkaSystemConsumer {
+  public final String TEST_SYSTEM = "test-system";
+  public final String TEST_STREAM = "test-stream";
+  public final String TEST_CLIENT_ID = "testClientId";
+  public final String BOOTSTRAP_SERVER = "127.0.0.1:8888";
+  public final String FETCH_THRESHOLD_MSGS = "50000";
+  public final String FETCH_THRESHOLD_BYTES = "100000";
+
+  private KafkaSystemConsumer createConsumer(String fetchMsg, String fetchBytes) {
+    final Map<String, String> map = new HashMap<>();
+
+    map.put(String.format(KafkaConfig.CONSUMER_FETCH_THRESHOLD(), TEST_SYSTEM), fetchMsg);
+    map.put(String.format(KafkaConfig.CONSUMER_FETCH_THRESHOLD_BYTES(), TEST_SYSTEM), fetchBytes);
+    map.put(String.format("systems.%s.consumer.%s", TEST_SYSTEM, ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG),
+        BOOTSTRAP_SERVER);
+    map.put(JobConfig.JOB_NAME(), "jobName");
+
+    Config config = new MapConfig(map);
+    KafkaConsumerConfig consumerConfig =
+        KafkaConsumerConfig.getKafkaSystemConsumerConfig(config, TEST_SYSTEM, TEST_CLIENT_ID);
+    final KafkaConsumer<byte[], byte[]> kafkaConsumer = new MockKafkaConsumer(consumerConfig);
+
+    MockKafkaSystemConsumer newKafkaSystemConsumer =
+        new MockKafkaSystemConsumer(kafkaConsumer, TEST_SYSTEM, config, TEST_CLIENT_ID,
+            new KafkaSystemConsumerMetrics(TEST_SYSTEM, new NoOpMetricsRegistry()), System::currentTimeMillis);
+
+    return newKafkaSystemConsumer;
+  }
+
+  @Test
+  public void testConfigValidations() {
+
+    final KafkaSystemConsumer consumer = createConsumer(FETCH_THRESHOLD_MSGS, FETCH_THRESHOLD_BYTES);
+
+    consumer.start();
+    // should be no failures
+  }
+
+  @Test
+  public void testFetchThresholdShouldDivideEvenlyAmongPartitions() {
+    final KafkaSystemConsumer consumer = createConsumer(FETCH_THRESHOLD_MSGS, FETCH_THRESHOLD_BYTES);
+    final int partitionsNum = 50;
+    for (int i = 0; i < partitionsNum; i++) {
+      consumer.register(new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(i)), "0");
+    }
+
+    consumer.start();
+
+    Assert.assertEquals(Long.valueOf(FETCH_THRESHOLD_MSGS) / partitionsNum, consumer.perPartitionFetchThreshold);
+    Assert.assertEquals(Long.valueOf(FETCH_THRESHOLD_BYTES) / 2 / partitionsNum,
+        consumer.perPartitionFetchThresholdBytes);
+
+    consumer.stop();
+  }
+
+  @Test
+  public void testConsumerRegisterOlderOffsetOfTheSamzaSSP() {
+
+    KafkaSystemConsumer consumer = createConsumer(FETCH_THRESHOLD_MSGS, FETCH_THRESHOLD_BYTES);
+
+    SystemStreamPartition ssp0 = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(0));
+    SystemStreamPartition ssp1 = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(1));
+    SystemStreamPartition ssp2 = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(2));
+
+    consumer.register(ssp0, "0");
+    consumer.register(ssp0, "5");
+    consumer.register(ssp1, "2");
+    consumer.register(ssp1, "3");
+    consumer.register(ssp2, "0");
+
+    assertEquals("0", consumer.topicPartitionsToOffset.get(KafkaSystemConsumer.toTopicPartition(ssp0)));
+    assertEquals("2", consumer.topicPartitionsToOffset.get(KafkaSystemConsumer.toTopicPartition(ssp1)));
+    assertEquals("0", consumer.topicPartitionsToOffset.get(KafkaSystemConsumer.toTopicPartition(ssp2)));
+  }
+
+  @Test
+  public void testFetchThresholdBytes() {
+
+    SystemStreamPartition ssp0 = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(0));
+    SystemStreamPartition ssp1 = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(1));
+    int partitionsNum = 2;
+    int ime0Size = Integer.valueOf(FETCH_THRESHOLD_MSGS) / partitionsNum; // fake size
+    int ime1Size = Integer.valueOf(FETCH_THRESHOLD_MSGS) / partitionsNum - 1; // fake size
+    int ime11Size = 20;
+    ByteArraySerializer bytesSerde = new ByteArraySerializer();
+    IncomingMessageEnvelope ime0 = new IncomingMessageEnvelope(ssp0, "0", bytesSerde.serialize("", "key0".getBytes()),
+        bytesSerde.serialize("", "value0".getBytes()), ime0Size);
+    IncomingMessageEnvelope ime1 = new IncomingMessageEnvelope(ssp1, "0", bytesSerde.serialize("", "key1".getBytes()),
+        bytesSerde.serialize("", "value1".getBytes()), ime1Size);
+    IncomingMessageEnvelope ime11 = new IncomingMessageEnvelope(ssp1, "0", bytesSerde.serialize("", "key11".getBytes()),
+        bytesSerde.serialize("", "value11".getBytes()), ime11Size);
+    KafkaSystemConsumer consumer = createConsumer(FETCH_THRESHOLD_MSGS, FETCH_THRESHOLD_BYTES);
+
+    consumer.register(ssp0, "0");
+    consumer.register(ssp1, "0");
+    consumer.start();
+    consumer.messageSink.addMessage(ssp0, ime0);
+    // queue for ssp0 should be full now, because we added message of size FETCH_THRESHOLD_MSGS/partitionsNum
+    Assert.assertEquals(false, consumer.messageSink.needsMoreMessages(ssp0));
+    consumer.messageSink.addMessage(ssp1, ime1);
+    // queue for ssp1 should be less then full now, because we added message of size (FETCH_THRESHOLD_MSGS/partitionsNum - 1)
+    Assert.assertEquals(true, consumer.messageSink.needsMoreMessages(ssp1));
+    consumer.messageSink.addMessage(ssp1, ime11);
+    // queue for ssp1 should full now, because we added message of size 20 on top
+    Assert.assertEquals(false, consumer.messageSink.needsMoreMessages(ssp1));
+
+    Assert.assertEquals(1, consumer.getNumMessagesInQueue(ssp0));
+    Assert.assertEquals(2, consumer.getNumMessagesInQueue(ssp1));
+    Assert.assertEquals(ime0Size, consumer.getMessagesSizeInQueue(ssp0));
+    Assert.assertEquals(ime1Size + ime11Size, consumer.getMessagesSizeInQueue(ssp1));
+
+    consumer.stop();
+  }
+
+  @Test
+  public void testFetchThresholdBytesDiabled() {
+    // Pass 0 as fetchThresholdByBytes, which disables checking for limit by size
+
+    SystemStreamPartition ssp0 = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(0));
+    SystemStreamPartition ssp1 = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(1));
+    int partitionsNum = 2;
+    int ime0Size = Integer.valueOf(FETCH_THRESHOLD_MSGS) / partitionsNum; // fake size, upto the limit
+    int ime1Size = Integer.valueOf(FETCH_THRESHOLD_MSGS) / partitionsNum - 100; // fake size, below the limit
+    int ime11Size = 20;// event with the second message still below the size limit
+    ByteArraySerializer bytesSerde = new ByteArraySerializer();
+    IncomingMessageEnvelope ime0 = new IncomingMessageEnvelope(ssp0, "0", bytesSerde.serialize("", "key0".getBytes()),
+        bytesSerde.serialize("", "value0".getBytes()), ime0Size);
+    IncomingMessageEnvelope ime1 = new IncomingMessageEnvelope(ssp1, "0", bytesSerde.serialize("", "key1".getBytes()),
+        bytesSerde.serialize("", "value1".getBytes()), ime1Size);
+    IncomingMessageEnvelope ime11 = new IncomingMessageEnvelope(ssp1, "0", bytesSerde.serialize("", "key11".getBytes()),
+        bytesSerde.serialize("", "value11".getBytes()), ime11Size);
+
+    // limit by number of messages 4/2 = 2 per partition
+    // limit by number of bytes - disabled
+    KafkaSystemConsumer consumer = createConsumer("4", "0"); // should disable
+
+    consumer.register(ssp0, "0");
+    consumer.register(ssp1, "0");
+    consumer.start();
+    consumer.messageSink.addMessage(ssp0, ime0);
+    // should be full by size, but not full by number of messages (1 of 2)
+    Assert.assertEquals(true, consumer.messageSink.needsMoreMessages(ssp0));
+    consumer.messageSink.addMessage(ssp1, ime1);
+    // not full neither by size nor by messages
+    Assert.assertEquals(true, consumer.messageSink.needsMoreMessages(ssp1));
+    consumer.messageSink.addMessage(ssp1, ime11);
+    // not full by size, but should be full by messages
+    Assert.assertEquals(false, consumer.messageSink.needsMoreMessages(ssp1));
+
+    Assert.assertEquals(1, consumer.getNumMessagesInQueue(ssp0));
+    Assert.assertEquals(2, consumer.getNumMessagesInQueue(ssp1));
+    Assert.assertEquals(ime0Size, consumer.getMessagesSizeInQueue(ssp0));
+    Assert.assertEquals(ime1Size + ime11Size, consumer.getMessagesSizeInQueue(ssp1));
+
+    consumer.stop();
+  }
+
+  // mock kafkaConsumer and SystemConsumer
+  static class MockKafkaConsumer extends KafkaConsumer {
+    public MockKafkaConsumer(Map<String, Object> configs) {
+      super(configs);
+    }
+  }
+
+  static class MockKafkaSystemConsumer extends KafkaSystemConsumer {
+    public MockKafkaSystemConsumer(Consumer kafkaConsumer, String systemName, Config config, String clientId,
+        KafkaSystemConsumerMetrics metrics, Clock clock) {
+      super(kafkaConsumer, systemName, config, clientId, metrics, clock);
+    }
+
+    @Override
+    void startConsumer() {
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/003ad106/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala
deleted file mode 100644
index 8656d10..0000000
--- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala
+++ /dev/null
@@ -1,191 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.system.kafka
-
-import kafka.api.TopicMetadata
-import kafka.api.PartitionMetadata
-import kafka.cluster.Broker
-import kafka.common.TopicAndPartition
-import kafka.message.Message
-import kafka.message.MessageAndOffset
-import org.apache.kafka.common.protocol.Errors
-import org.apache.samza.system.IncomingMessageEnvelope
-import org.apache.samza.system.SystemStreamPartition
-import org.apache.samza.Partition
-import org.apache.samza.util.TopicMetadataStore
-import org.junit.Test
-import org.junit.Assert._
-import org.apache.samza.system.SystemAdmin
-import org.mockito.Mockito._
-import org.mockito.Matchers._
-
-class TestKafkaSystemConsumer {
-  val systemAdmin: SystemAdmin = mock(classOf[KafkaSystemAdmin])
-  private val SSP: SystemStreamPartition = new SystemStreamPartition("test", "test", new Partition(0))
-  private val envelope: IncomingMessageEnvelope = new IncomingMessageEnvelope(SSP, null, null, null)
-  private val envelopeWithSize: IncomingMessageEnvelope = new IncomingMessageEnvelope(SSP, null, null, null, 100)
-  private val clientId = "TestClientId"
-
-  @Test
-  def testFetchThresholdShouldDivideEvenlyAmongPartitions {
-    val metadataStore = new MockMetadataStore
-    val consumer = new KafkaSystemConsumer("", systemAdmin, new KafkaSystemConsumerMetrics, metadataStore, clientId, fetchThreshold = 50000) {
-      override def refreshBrokers {
-      }
-    }
-
-    for (i <- 0 until 50) {
-      consumer.register(new SystemStreamPartition("test-system", "test-stream", new Partition(i)), "0")
-    }
-
-    consumer.start
-
-    assertEquals(1000, consumer.perPartitionFetchThreshold)
-  }
-
-  @Test
-  def testBrokerCreationShouldTriggerStart {
-    val systemName = "test-system"
-    val streamName = "test-stream"
-    val metrics = new KafkaSystemConsumerMetrics
-    // Lie and tell the store that the partition metadata is empty. We can't
-    // use partition metadata because it has Broker in its constructor, which
-    // is package private to Kafka.
-    val metadataStore = new MockMetadataStore(Map(streamName -> TopicMetadata(streamName, Seq.empty, Errors.NONE)))
-    var hosts = List[String]()
-    var getHostPortCount = 0
-    val consumer = new KafkaSystemConsumer(systemName, systemAdmin, metrics, metadataStore, clientId) {
-      override def getLeaderHostPort(partitionMetadata: Option[PartitionMetadata]): Option[(String, Int)] = {
-        // Generate a unique host every time getHostPort is called.
-        getHostPortCount += 1
-        Some("localhost-%s" format getHostPortCount, 0)
-      }
-
-      override def createBrokerProxy(host: String, port: Int): BrokerProxy = {
-        new BrokerProxy(host, port, systemName, "", metrics, sink) {
-          override def addTopicPartition(tp: TopicAndPartition, nextOffset: Option[String]) = {
-            // Skip this since we normally do verification of offsets, which
-            // tries to connect to Kafka. Rather than mock that, just forget it.
-            nextOffsets.size
-          }
-
-          override def start {
-            hosts :+= host
-          }
-        }
-      }
-    }
-
-    consumer.register(new SystemStreamPartition(systemName, streamName, new Partition(0)), "1")
-    assertEquals(0, hosts.size)
-    consumer.start
-    assertEquals(List("localhost-1"), hosts)
-    // Should trigger a refresh with a new host.
-    consumer.sink.abdicate(new TopicAndPartition(streamName, 0), 2)
-    assertEquals(List("localhost-1", "localhost-2"), hosts)
-  }
-
-  @Test
-  def testConsumerRegisterOlderOffsetOfTheSamzaSSP {
-    when(systemAdmin.offsetComparator(anyString, anyString)).thenCallRealMethod()
-
-    val metadataStore = new MockMetadataStore
-    val consumer = new KafkaSystemConsumer("", systemAdmin, new KafkaSystemConsumerMetrics, metadataStore, clientId, fetchThreshold = 50000)
-    val ssp0 = new SystemStreamPartition("test-system", "test-stream", new Partition(0))
-    val ssp1 = new SystemStreamPartition("test-system", "test-stream", new Partition(1))
-    val ssp2 = new SystemStreamPartition("test-system", "test-stream", new Partition(2))
-
-    consumer.register(ssp0, "0")
-    consumer.register(ssp0, "5")
-    consumer.register(ssp1, "2")
-    consumer.register(ssp1, "3")
-    consumer.register(ssp2, "0")
-
-    assertEquals("0", consumer.topicPartitionsAndOffsets(KafkaSystemConsumer.toTopicAndPartition(ssp0)))
-    assertEquals("2", consumer.topicPartitionsAndOffsets(KafkaSystemConsumer.toTopicAndPartition(ssp1)))
-    assertEquals("0", consumer.topicPartitionsAndOffsets(KafkaSystemConsumer.toTopicAndPartition(ssp2)))
-  }
-
-  @Test
-  def testFetchThresholdBytesShouldDivideEvenlyAmongPartitions {
-    val metadataStore = new MockMetadataStore
-    val consumer = new KafkaSystemConsumer("", systemAdmin, new KafkaSystemConsumerMetrics, metadataStore, clientId,
-      fetchThreshold = 50000, fetchThresholdBytes = 60000L, fetchLimitByBytesEnabled = true) {
-      override def refreshBrokers {
-      }
-    }
-
-    for (i <- 0 until 10) {
-      consumer.register(new SystemStreamPartition("test-system", "test-stream", new Partition(i)), "0")
-    }
-
-    consumer.start
-
-    assertEquals(5000, consumer.perPartitionFetchThreshold)
-    assertEquals(3000, consumer.perPartitionFetchThresholdBytes)
-  }
-
-  @Test
-  def testFetchThresholdBytes {
-    val metadataStore = new MockMetadataStore
-    val consumer = new KafkaSystemConsumer("test-system", systemAdmin, new KafkaSystemConsumerMetrics, metadataStore, clientId,
-      fetchThreshold = 50000, fetchThresholdBytes = 60000L, fetchLimitByBytesEnabled = true) {
-      override def refreshBrokers {
-      }
-    }
-
-    for (i <- 0 until 10) {
-      consumer.register(new SystemStreamPartition("test-system", "test-stream", new Partition(i)), "0")
-    }
-
-    consumer.start
-
-    val msg = Array[Byte](5, 112, 9, 126)
-    val msgAndOffset: MessageAndOffset = MessageAndOffset(new Message(msg), 887654)
-    // 4 data + 18 Message overhead + 80 IncomingMessageEnvelope overhead
-    consumer.sink.addMessage(new TopicAndPartition("test-stream", 0),  msgAndOffset, 887354)
-
-    assertEquals(106, consumer.getMessagesSizeInQueue(new SystemStreamPartition("test-system", "test-stream", new Partition(0))))
-  }
-
-  @Test
-  def testFetchThresholdBytesDisabled {
-    val metadataStore = new MockMetadataStore
-    val consumer = new KafkaSystemConsumer("", systemAdmin, new KafkaSystemConsumerMetrics, metadataStore, clientId,
-      fetchThreshold = 50000, fetchThresholdBytes = 60000L) {
-      override def refreshBrokers {
-      }
-    }
-
-    for (i <- 0 until 10) {
-      consumer.register(new SystemStreamPartition("test-system", "test-stream", new Partition(i)), "0")
-    }
-
-    consumer.start
-
-    assertEquals(5000, consumer.perPartitionFetchThreshold)
-    assertEquals(0, consumer.perPartitionFetchThresholdBytes)
-    assertEquals(0, consumer.getMessagesSizeInQueue(new SystemStreamPartition("test-system", "test-stream", new Partition(0))))
-  }
-}
-
-class MockMetadataStore(var metadata: Map[String, TopicMetadata] = Map()) extends TopicMetadataStore {
-  def getTopicInfo(topics: Set[String]): Map[String, TopicMetadata] = metadata
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/003ad106/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
----------------------------------------------------------------------
diff --git a/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala b/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
index 864d2e5..8405c63 100644
--- a/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
+++ b/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
@@ -37,7 +37,7 @@ import org.apache.kafka.common.protocol.SecurityProtocol
 import org.apache.kafka.common.security.JaasUtils
 import org.apache.samza.config._
 import org.apache.samza.container.TaskName
-import org.apache.samza.job.local.ThreadJobFactory
+import org.apache.samza.job.local.{ThreadJob, ThreadJobFactory}
 import org.apache.samza.job.model.{ContainerModel, JobModel}
 import org.apache.samza.job.{ApplicationStatus, JobRunner, StreamJob}
 import org.apache.samza.metrics.MetricsRegistryMap
@@ -223,6 +223,13 @@ class StreamTaskTestUtil {
    * interrupt, which is forwarded on to ThreadJob, and marked as a failure).
    */
   def stopJob(job: StreamJob) {
+    // make sure we don't kill the job before it was started.
+    // eventProcesses guarantees all the consumers have been initialized
+    val tasks = TestTask.tasks
+    val task = tasks.values.toList.head
+    task.eventProcessed.await(60, TimeUnit.SECONDS)
+    assertEquals(0, task.eventProcessed.getCount)
+
     // Shutdown task.
     job.kill
     val status = job.waitForFinish(60000)
@@ -279,7 +286,10 @@ class StreamTaskTestUtil {
     val taskConfig = new TaskConfig(jobModel.getConfig)
     val checkpointManager = taskConfig.getCheckpointManager(new MetricsRegistryMap())
     checkpointManager match {
-      case Some(checkpointManager) => checkpointManager.createResources
+      case Some(checkpointManager) => {
+        checkpointManager.createResources
+        checkpointManager.stop
+      }
       case _ => assert(checkpointManager != null, "No checkpoint manager factory configured")
     }
 
@@ -323,6 +333,7 @@ object TestTask {
 abstract class TestTask extends StreamTask with InitableTask {
   var received = ArrayBuffer[String]()
   val initFinished = new CountDownLatch(1)
+  val eventProcessed = new CountDownLatch(1)
   @volatile var gotMessage = new CountDownLatch(1)
 
   def init(config: Config, context: TaskContext) {
@@ -334,6 +345,8 @@ abstract class TestTask extends StreamTask with InitableTask {
   def process(envelope: IncomingMessageEnvelope, collector: MessageCollector, coordinator: TaskCoordinator) {
     val msg = envelope.getMessage.asInstanceOf[String]
 
+    eventProcessed.countDown()
+
     System.err.println("TestTask.process(): %s" format msg)
 
     received += msg

http://git-wip-us.apache.org/repos/asf/samza/blob/003ad106/samza-test/src/test/scala/org/apache/samza/test/integration/TestShutdownStatefulTask.scala
----------------------------------------------------------------------
diff --git a/samza-test/src/test/scala/org/apache/samza/test/integration/TestShutdownStatefulTask.scala b/samza-test/src/test/scala/org/apache/samza/test/integration/TestShutdownStatefulTask.scala
index e4d47d1..ccb7cd4 100644
--- a/samza-test/src/test/scala/org/apache/samza/test/integration/TestShutdownStatefulTask.scala
+++ b/samza-test/src/test/scala/org/apache/samza/test/integration/TestShutdownStatefulTask.scala
@@ -77,7 +77,6 @@ class TestShutdownStatefulTask extends StreamTaskTestUtil {
     val (job, task) = startJob
 
     // Validate that restored is empty.
-    assertEquals(0, task.initFinished.getCount)
     assertEquals(0, task.asInstanceOf[ShutdownStateStoreTask].restored.size)
     assertEquals(0, task.received.size)
 
@@ -88,7 +87,6 @@ class TestShutdownStatefulTask extends StreamTaskTestUtil {
     send(task, "2")
     send(task, "99")
     send(task, "99")
-
     stopJob(job)
 
   }
@@ -120,7 +118,7 @@ class ShutdownStateStoreTask extends TestTask {
       .asInstanceOf[KeyValueStore[String, String]]
     val iter = store.all
     iter.asScala.foreach( p => restored += (p.getKey -> p.getValue))
-    System.err.println("ShutdownStateStoreTask.createStream(): %s" format restored)
+    System.out.println("ShutdownStateStoreTask.createStream(): %s" format restored)
     iter.close
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/003ad106/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java b/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java
index 0b405f0..b30b896 100644
--- a/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java
+++ b/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java
@@ -157,7 +157,7 @@ public class YarnJobValidationTool {
     coordinatorStreamManager.start();
     coordinatorStreamManager.bootstrap();
     ChangelogStreamManager changelogStreamManager = new ChangelogStreamManager(coordinatorStreamManager);
-    JobModelManager jobModelManager = JobModelManager.apply(coordinatorStreamManager, changelogStreamManager.readPartitionMapping());
+    JobModelManager jobModelManager = JobModelManager.apply(coordinatorStreamManager.getConfig(), changelogStreamManager.readPartitionMapping());
     validator.init(config);
     Map<String, String> jmxUrls = jobModelManager.jobModel().getAllContainerToHostValues(SetContainerHostMapping.JMX_TUNNELING_URL_KEY);
     for (Map.Entry<String, String> entry : jmxUrls.entrySet()) {

http://git-wip-us.apache.org/repos/asf/samza/blob/003ad106/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaYarnAppMasterService.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaYarnAppMasterService.scala b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaYarnAppMasterService.scala
index da23b91..1ad4522 100644
--- a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaYarnAppMasterService.scala
+++ b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaYarnAppMasterService.scala
@@ -106,7 +106,9 @@ class TestSamzaYarnAppMasterService {
     coordinatorStreamManager.start
     coordinatorStreamManager.bootstrap
     val changelogPartitionManager = new ChangelogStreamManager(coordinatorStreamManager)
-    JobModelManager(coordinatorStreamManager, changelogPartitionManager.readPartitionMapping())
+    val jobModelManager = JobModelManager(coordinatorStreamManager.getConfig, changelogPartitionManager.readPartitionMapping())
+    coordinatorStreamManager.stop()
+    jobModelManager
   }
 
   private def getDummyConfig: Config = new MapConfig(Map[String, String](


[2/2] samza git commit: NewSystemConsumer for kafka system

Posted by bo...@apache.org.
NewSystemConsumer for kafka system

Remove SimpleConsumer  and BrokerProxy from Samza's KafkaSystemConsumer implementation.
Instead use KafkaConsumerProxy with high-level kafka consumer.

Author: Boris S <bo...@apache.org>
Author: Boris Shkolnik <bs...@linkedin.com>

Reviewers: Shanthoosh Venktataraman <sp...@usc.edu>, Prateek Maheshwari <pm...@linkedin.com>

Closes #624 from sborya/NewConsumer2


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/003ad106
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/003ad106
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/003ad106

Branch: refs/heads/master
Commit: 003ad1068ac7d36f462b49ded0dbb2e4dedc1089
Parents: 078afb5
Author: Boris S <bo...@apache.org>
Authored: Tue Sep 25 18:07:44 2018 -0700
Committer: Boris S <bo...@apache.org>
Committed: Tue Sep 25 18:07:44 2018 -0700

----------------------------------------------------------------------
 .../samza/system/IncomingMessageEnvelope.java   |   3 +-
 .../ClusterBasedJobCoordinator.java             |   2 +-
 .../apache/samza/storage/StorageRecovery.java   |   2 +-
 .../samza/checkpoint/CheckpointTool.scala       |   2 +-
 .../apache/samza/container/SamzaContainer.scala |   2 +-
 .../samza/coordinator/JobModelManager.scala     |   6 +-
 .../samza/job/local/ProcessJobFactory.scala     |   3 +-
 .../samza/job/local/ThreadJobFactory.scala      |  20 +-
 .../samza/coordinator/TestJobCoordinator.scala  |   4 +-
 .../org/apache/samza/config/KafkaConfig.scala   |   5 +-
 .../samza/config/KafkaConsumerConfig.java       | 210 +++++++++
 .../apache/samza/system/kafka/BrokerProxy.scala | 332 --------------
 .../samza/system/kafka/KafkaConsumerProxy.java  | 423 ++++++++++++++++++
 .../samza/system/kafka/KafkaSystemConsumer.java | 371 ++++++++++++++++
 .../system/kafka/KafkaSystemConsumer.scala      | 309 -------------
 .../kafka/KafkaSystemConsumerMetrics.scala      |  68 ++-
 .../samza/system/kafka/KafkaSystemFactory.scala |  80 ++--
 .../samza/config/TestKafkaConsumerConfig.java   | 150 +++++++
 .../system/kafka/TestKafkaSystemAdminJava.java  |  18 +-
 .../samza/system/kafka/TestBrokerProxy.scala    | 434 -------------------
 .../system/kafka/TestKafkaSystemConsumer.java   | 220 ++++++++++
 .../system/kafka/TestKafkaSystemConsumer.scala  | 191 --------
 .../test/integration/StreamTaskTestUtil.scala   |  17 +-
 .../integration/TestShutdownStatefulTask.scala  |   4 +-
 .../samza/validation/YarnJobValidationTool.java |   2 +-
 .../yarn/TestSamzaYarnAppMasterService.scala    |   4 +-
 26 files changed, 1491 insertions(+), 1391 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/003ad106/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java b/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java
index 4d0ce2f..c5aed31 100644
--- a/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java
+++ b/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java
@@ -59,7 +59,8 @@ public class IncomingMessageEnvelope {
    * @param message A deserialized message received from the partition offset.
    * @param size size of the message and key in bytes.
    */
-  public IncomingMessageEnvelope(SystemStreamPartition systemStreamPartition, String offset, Object key, Object message, int size) {
+  public IncomingMessageEnvelope(SystemStreamPartition systemStreamPartition, String offset,
+      Object key, Object message, int size) {
     this.systemStreamPartition = systemStreamPartition;
     this.offset = offset;
     this.key = key;

http://git-wip-us.apache.org/repos/asf/samza/blob/003ad106/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
index 016d171..12e26f7 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
@@ -174,7 +174,7 @@ public class ClusterBasedJobCoordinator {
 
     // build a JobModelManager and ChangelogStreamManager and perform partition assignments.
     changelogStreamManager = new ChangelogStreamManager(coordinatorStreamManager);
-    jobModelManager = JobModelManager.apply(coordinatorStreamManager, changelogStreamManager.readPartitionMapping());
+    jobModelManager = JobModelManager.apply(coordinatorStreamManager.getConfig(), changelogStreamManager.readPartitionMapping());
 
     config = jobModelManager.jobModel().getConfig();
     hasDurableStores = new StorageConfig(config).hasDurableStores();

http://git-wip-us.apache.org/repos/asf/samza/blob/003ad106/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
index bf46018..9a76d75 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
@@ -131,7 +131,7 @@ public class StorageRecovery extends CommandLine {
     coordinatorStreamManager.start();
     coordinatorStreamManager.bootstrap();
     ChangelogStreamManager changelogStreamManager = new ChangelogStreamManager(coordinatorStreamManager);
-    JobModel jobModel = JobModelManager.apply(coordinatorStreamManager, changelogStreamManager.readPartitionMapping()).jobModel();
+    JobModel jobModel = JobModelManager.apply(coordinatorStreamManager.getConfig(), changelogStreamManager.readPartitionMapping()).jobModel();
     containers = jobModel.getContainers();
     coordinatorStreamManager.stop();
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/003ad106/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala b/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
index 0ca8a3d..65fb419 100644
--- a/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
+++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
@@ -170,7 +170,7 @@ class CheckpointTool(config: Config, newOffsets: TaskNameToCheckpointMap, manage
     coordinatorStreamManager.start
     coordinatorStreamManager.bootstrap
     val changelogManager = new ChangelogStreamManager(coordinatorStreamManager)
-    val jobModelManager = JobModelManager(coordinatorStreamManager, changelogManager.readPartitionMapping())
+    val jobModelManager = JobModelManager(coordinatorStreamManager.getConfig, changelogManager.readPartitionMapping())
     val taskNames = jobModelManager
       .jobModel
       .getContainers

http://git-wip-us.apache.org/repos/asf/samza/blob/003ad106/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 7b64f5e..fba7329 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -822,7 +822,7 @@ class SamzaContainer(
     }
 
     try {
-      info("Shutting down.")
+      info("Shutting down SamzaContainer.")
       removeShutdownHook
 
       jmxServer.stop

http://git-wip-us.apache.org/repos/asf/samza/blob/003ad106/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
index f7698c0..600b7a1 100644
--- a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
@@ -35,7 +35,6 @@ import org.apache.samza.container.LocalityManager
 import org.apache.samza.container.TaskName
 import org.apache.samza.coordinator.server.HttpServer
 import org.apache.samza.coordinator.server.JobServlet
-import org.apache.samza.coordinator.stream.CoordinatorStreamManager
 import org.apache.samza.job.model.JobModel
 import org.apache.samza.job.model.TaskModel
 import org.apache.samza.metrics.MetricsRegistryMap
@@ -64,12 +63,11 @@ object JobModelManager extends Logging {
    * a) Reads the jobModel from coordinator stream using the job's configuration.
    * b) Recomputes changelog partition mapping based on jobModel and job's configuration.
    * c) Builds JobModelManager using the jobModel read from coordinator stream.
-   * @param coordinatorStreamManager Coordinator stream manager.
+   * @param config Config from the coordinator stream.
    * @param changelogPartitionMapping The changelog partition-to-task mapping.
    * @return JobModelManager
    */
-  def apply(coordinatorStreamManager: CoordinatorStreamManager, changelogPartitionMapping: util.Map[TaskName, Integer]) = {
-    val config = coordinatorStreamManager.getConfig
+  def apply(config: Config, changelogPartitionMapping: util.Map[TaskName, Integer]) = {
     val localityManager = new LocalityManager(config, new MetricsRegistryMap())
 
     // Map the name of each system to the corresponding SystemAdmin

http://git-wip-us.apache.org/repos/asf/samza/blob/003ad106/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
index 642a484..64f516b 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
@@ -50,7 +50,7 @@ class ProcessJobFactory extends StreamJobFactory with Logging {
     coordinatorStreamManager.bootstrap
     val changelogStreamManager = new ChangelogStreamManager(coordinatorStreamManager)
 
-    val coordinator = JobModelManager(coordinatorStreamManager, changelogStreamManager.readPartitionMapping())
+    val coordinator = JobModelManager(coordinatorStreamManager.getConfig, changelogStreamManager.readPartitionMapping())
     val jobModel = coordinator.jobModel
 
     val taskPartitionMappings: util.Map[TaskName, Integer] = new util.HashMap[TaskName, Integer]
@@ -61,6 +61,7 @@ class ProcessJobFactory extends StreamJobFactory with Logging {
     }
 
     changelogStreamManager.writePartitionMapping(taskPartitionMappings)
+    coordinatorStreamManager.stop()
 
     //create necessary checkpoint and changelog streams
     val checkpointManager = new TaskConfigJava(jobModel.getConfig).getCheckpointManager(metricsRegistry)

http://git-wip-us.apache.org/repos/asf/samza/blob/003ad106/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
index abd7f65..bec4ec0 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
@@ -20,9 +20,9 @@
 package org.apache.samza.job.local
 
 import org.apache.samza.application.{ApplicationDescriptorUtil, ApplicationUtil}
-import org.apache.samza.config.{Config, TaskConfigJava}
 import org.apache.samza.config.JobConfig._
 import org.apache.samza.config.ShellCommandConfig._
+import org.apache.samza.config.{Config, TaskConfigJava}
 import org.apache.samza.container.{SamzaContainer, SamzaContainerListener, TaskName}
 import org.apache.samza.coordinator.JobModelManager
 import org.apache.samza.coordinator.stream.CoordinatorStreamManager
@@ -30,16 +30,15 @@ import org.apache.samza.job.{StreamJob, StreamJobFactory}
 import org.apache.samza.metrics.{JmxServer, MetricsRegistryMap, MetricsReporter}
 import org.apache.samza.runtime.ProcessorContext
 import org.apache.samza.storage.ChangelogStreamManager
-import org.apache.samza.task.TaskFactory
-import org.apache.samza.task.TaskFactoryUtil
+import org.apache.samza.task.{TaskFactory, TaskFactoryUtil}
 import org.apache.samza.util.Logging
 
 import scala.collection.JavaConversions._
 import scala.collection.mutable
 
 /**
- * Creates a new Thread job with the given config
- */
+  * Creates a new Thread job with the given config
+  */
 class ThreadJobFactory extends StreamJobFactory with Logging {
   def getJob(config: Config): StreamJob = {
     info("Creating a ThreadJob, which is only meant for debugging.")
@@ -51,7 +50,8 @@ class ThreadJobFactory extends StreamJobFactory with Logging {
     coordinatorStreamManager.bootstrap
     val changelogStreamManager = new ChangelogStreamManager(coordinatorStreamManager)
 
-    val coordinator = JobModelManager(coordinatorStreamManager, changelogStreamManager.readPartitionMapping())
+    val coordinator = JobModelManager(coordinatorStreamManager.getConfig, changelogStreamManager.readPartitionMapping())
+
     val jobModel = coordinator.jobModel
 
     val taskPartitionMappings: mutable.Map[TaskName, Integer] = mutable.Map[TaskName, Integer]()
@@ -67,6 +67,7 @@ class ThreadJobFactory extends StreamJobFactory with Logging {
     val checkpointManager = new TaskConfigJava(jobModel.getConfig).getCheckpointManager(metricsRegistry)
     if (checkpointManager != null) {
       checkpointManager.createResources()
+      checkpointManager.stop()
     }
     ChangelogStreamManager.createChangelogStreams(jobModel.getConfig, jobModel.maxChangeLogStreamPartitions)
 
@@ -74,17 +75,17 @@ class ThreadJobFactory extends StreamJobFactory with Logging {
     val jmxServer = new JmxServer
 
     val appDesc = ApplicationDescriptorUtil.getAppDescriptor(ApplicationUtil.fromConfig(config), config)
-    val taskFactory : TaskFactory[_] = TaskFactoryUtil.getTaskFactory(appDesc)
+    val taskFactory: TaskFactory[_] = TaskFactoryUtil.getTaskFactory(appDesc)
 
     // Give developers a nice friendly warning if they've specified task.opts and are using a threaded job.
     config.getTaskOpts match {
       case Some(taskOpts) => warn("%s was specified in config, but is not being used because job is being executed with ThreadJob. " +
-        "You probably want to run %s=%s." format (TASK_JVM_OPTS, STREAM_JOB_FACTORY_CLASS, classOf[ProcessJobFactory].getName))
+        "You probably want to run %s=%s." format(TASK_JVM_OPTS, STREAM_JOB_FACTORY_CLASS, classOf[ProcessJobFactory].getName))
       case _ => None
     }
 
     val containerListener = {
-      val processorLifecycleListener = appDesc.getProcessorLifecycleListenerFactory().createInstance(new ProcessorContext() { }, config)
+      val processorLifecycleListener = appDesc.getProcessorLifecycleListenerFactory().createInstance(new ProcessorContext() {}, config)
       new SamzaContainerListener {
         override def afterFailure(t: Throwable): Unit = {
           processorLifecycleListener.afterFailure(t)
@@ -120,6 +121,7 @@ class ThreadJobFactory extends StreamJobFactory with Logging {
       threadJob
     } finally {
       coordinator.stop
+      coordinatorStreamManager.stop()
       jmxServer.stop
     }
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/003ad106/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
index 2488355..b7a9bec 100644
--- a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
+++ b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
@@ -275,7 +275,9 @@ class TestJobCoordinator extends FlatSpec with PrivateMethodTester {
     coordinatorStreamManager.start
     coordinatorStreamManager.bootstrap
     val changelogPartitionManager = new ChangelogStreamManager(coordinatorStreamManager)
-    JobModelManager(coordinatorStreamManager, changelogPartitionManager.readPartitionMapping())
+    val jobModelManager = JobModelManager(coordinatorStreamManager.getConfig, changelogPartitionManager.readPartitionMapping())
+    coordinatorStreamManager.stop()
+    jobModelManager
   }
 
   @Before

http://git-wip-us.apache.org/repos/asf/samza/blob/003ad106/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
index 26664ea..ef43e72 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
@@ -289,7 +289,10 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
     properties
   }
 
-  // kafka config
+  /**
+    * @deprecated Use KafkaConsumerConfig
+    */
+  @Deprecated
   def getKafkaSystemConsumerConfig( systemName: String,
                                     clientId: String,
                                     groupId: String = "undefined-samza-consumer-group-%s" format UUID.randomUUID.toString,

http://git-wip-us.apache.org/repos/asf/samza/blob/003ad106/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConsumerConfig.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConsumerConfig.java b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConsumerConfig.java
new file mode 100644
index 0000000..3fa66e5
--- /dev/null
+++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConsumerConfig.java
@@ -0,0 +1,210 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.samza.config;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.RangeAssignor;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.samza.SamzaException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.runtime.AbstractFunction0;
+
+
+/**
+ * The configuration class for KafkaConsumer
+ */
+public class KafkaConsumerConfig extends HashMap<String, Object> {
+
+  public static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerConfig.class);
+
+  static final String PRODUCER_CLIENT_ID_PREFIX = "kafka-producer";
+  static final String CONSUMER_CLIENT_ID_PREFIX = "kafka-consumer";
+  static final String ADMIN_CLIENT_ID_PREFIX = "samza-admin";
+
+  /*
+   * By default, KafkaConsumer will fetch some big number of available messages for all the partitions.
+   * This may cause memory issues. That's why we will limit the number of messages per partition we get on EACH poll().
+   */
+  static final String DEFAULT_KAFKA_CONSUMER_MAX_POLL_RECORDS = "100";
+
+  private KafkaConsumerConfig(Map<String, Object> map) {
+    super(map);
+  }
+
+  /**
+   * Helper method to create configs for use in Kafka consumer.
+   * The values are based on the "consumer" subset of the configs provided by the app and Samza overrides.
+   *
+   * @param config config provided by the app.
+   * @param systemName system name to get the consumer configuration for.
+   * @param clientId client id to be used in the Kafka consumer.
+   * @return KafkaConsumerConfig
+   */
+  public static KafkaConsumerConfig getKafkaSystemConsumerConfig(Config config, String systemName, String clientId) {
+
+    Config subConf = config.subset(String.format("systems.%s.consumer.", systemName), true);
+
+    //Kafka client configuration
+    String groupId = getConsumerGroupId(config);
+
+    Map<String, Object> consumerProps = new HashMap<>();
+    consumerProps.putAll(subConf);
+
+    consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+    consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
+
+    // These are values we enforce in sazma, and they cannot be overwritten.
+
+    // Disable consumer auto-commit because Samza controls commits
+    consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+
+    // Translate samza config value to kafka config value
+    consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
+        getAutoOffsetResetValue((String) consumerProps.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)));
+
+    // if consumer bootstrap servers are not configured, get them from the producer configs
+    if (!subConf.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)) {
+      String bootstrapServers =
+          config.get(String.format("systems.%s.producer.%s", systemName, ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
+      if (StringUtils.isEmpty(bootstrapServers)) {
+        throw new SamzaException("Missing " + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG + " config  for " + systemName);
+      }
+      consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+    }
+
+    // Always use default partition assignment strategy. Do not allow override.
+    consumerProps.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RangeAssignor.class.getName());
+
+    // the consumer is fully typed, and deserialization can be too. But in case it is not provided we should
+    // default to byte[]
+    if (!consumerProps.containsKey(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)) {
+      LOG.info("setting key serialization for the consumer(for system {}) to ByteArrayDeserializer", systemName);
+      consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+    }
+    if (!consumerProps.containsKey(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)) {
+      LOG.info("setting value serialization for the consumer(for system {}) to ByteArrayDeserializer", systemName);
+      consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+    }
+
+    // Override default max poll config if there is no value
+    consumerProps.computeIfAbsent(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,
+        (k) -> DEFAULT_KAFKA_CONSUMER_MAX_POLL_RECORDS);
+
+    return new KafkaConsumerConfig(consumerProps);
+  }
+
+  // group id should be unique per job
+  static String getConsumerGroupId(Config config) {
+    JobConfig jobConfig = new JobConfig(config);
+    Option jobNameOption = jobConfig.getName();
+    if (jobNameOption.isEmpty()) {
+      throw new ConfigException("Missing job name");
+    }
+    String jobName = (String) jobNameOption.get();
+
+    Option jobIdOption = jobConfig.getJobId();
+    String jobId = "1";
+    if (! jobIdOption.isEmpty()) {
+      jobId = (String) jobIdOption.get();
+    }
+
+    return String.format("%s-%s", jobName, jobId);
+  }
+
+  // client id should be unique per job
+  public static String getConsumerClientId(Config config) {
+    return getConsumerClientId(CONSUMER_CLIENT_ID_PREFIX, config);
+  }
+
+  public static String getProducerClientId(Config config) {
+    return getConsumerClientId(PRODUCER_CLIENT_ID_PREFIX, config);
+  }
+
+  public static String getAdminClientId(Config config) {
+    return getConsumerClientId(ADMIN_CLIENT_ID_PREFIX, config);
+  }
+
+  static String getConsumerClientId(String id, Config config) {
+    JobConfig jobConfig = new JobConfig(config);
+    Option jobNameOption = jobConfig.getName();
+    if (jobNameOption.isEmpty()) {
+      throw new ConfigException("Missing job name");
+    }
+    String jobName = (String) jobNameOption.get();
+
+    Option jobIdOption = jobConfig.getJobId();
+    String jobId = "1";
+    if (! jobIdOption.isEmpty()) {
+      jobId = (String) jobIdOption.get();
+    }
+
+    return String.format("%s-%s-%s", id.replaceAll("\\W", "_"), jobName.replaceAll("\\W", "_"),
+        jobId.replaceAll("\\W", "_"));
+  }
+
+  /**
+   * If settings for auto.reset in samza are different from settings in Kafka (auto.offset.reset),
+   * then need to convert them (see kafka.apache.org/documentation):
+   * "largest" -> "latest"
+   * "smallest" -> "earliest"
+   *
+   * If no setting specified we return "latest" (same as Kafka).
+   * @param autoOffsetReset value from the app provided config
+   * @return String representing the config value for "auto.offset.reset" property
+   */
+  static String getAutoOffsetResetValue(final String autoOffsetReset) {
+    final String SAMZA_OFFSET_LARGEST = "largest";
+    final String SAMZA_OFFSET_SMALLEST = "smallest";
+    final String KAFKA_OFFSET_LATEST = "latest";
+    final String KAFKA_OFFSET_EARLIEST = "earliest";
+    final String KAFKA_OFFSET_NONE = "none";
+
+    if (autoOffsetReset == null) {
+      return KAFKA_OFFSET_LATEST; // return default
+    }
+
+    // accept kafka values directly
+    if (autoOffsetReset.equals(KAFKA_OFFSET_EARLIEST) || autoOffsetReset.equals(KAFKA_OFFSET_LATEST)
+        || autoOffsetReset.equals(KAFKA_OFFSET_NONE)) {
+      return autoOffsetReset;
+    }
+
+    String newAutoOffsetReset;
+    switch (autoOffsetReset) {
+      case SAMZA_OFFSET_LARGEST:
+        newAutoOffsetReset = KAFKA_OFFSET_LATEST;
+        break;
+      case SAMZA_OFFSET_SMALLEST:
+        newAutoOffsetReset = KAFKA_OFFSET_EARLIEST;
+        break;
+      default:
+        newAutoOffsetReset = KAFKA_OFFSET_LATEST;
+    }
+    LOG.info("AutoOffsetReset value converted from {} to {}", autoOffsetReset, newAutoOffsetReset);
+    return newAutoOffsetReset;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/003ad106/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
deleted file mode 100644
index 423b68a..0000000
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
+++ /dev/null
@@ -1,332 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.samza.system.kafka
-
-import java.lang.Thread.UncaughtExceptionHandler
-import java.nio.channels.ClosedByInterruptException
-import java.util.concurrent.{ConcurrentHashMap, CountDownLatch}
-
-import kafka.api._
-import kafka.common.{ErrorMapping, NotLeaderForPartitionException, TopicAndPartition, UnknownTopicOrPartitionException}
-import kafka.consumer.ConsumerConfig
-import kafka.message.MessageSet
-import org.apache.samza.SamzaException
-import org.apache.samza.util.ExponentialSleepStrategy
-import org.apache.samza.util.KafkaUtil
-import org.apache.samza.util.Logging
-
-import scala.collection.JavaConverters._
-import scala.collection.concurrent
-
-/**
- * A BrokerProxy consolidates Kafka fetches meant for a particular broker and retrieves them all at once, providing
- * a way for consumers to retrieve those messages by topic and partition.
- */
-class BrokerProxy(
-  val host: String,
-  val port: Int,
-  val system: String,
-  val clientID: String,
-  val metrics: KafkaSystemConsumerMetrics,
-  val messageSink: MessageSink,
-  val timeout: Int = ConsumerConfig.SocketTimeout,
-  val bufferSize: Int = ConsumerConfig.SocketBufferSize,
-  val fetchSize: StreamFetchSizes = new StreamFetchSizes,
-  val consumerMinSize:Int = ConsumerConfig.MinFetchBytes,
-  val consumerMaxWait:Int = ConsumerConfig.MaxFetchWaitMs,
-  offsetGetter: GetOffset = new GetOffset("fail")) extends Toss with Logging {
-
-  /**
-   * How long should the fetcher thread sleep before checking if any TopicPartitions has been added to its purview
-   */
-  val sleepMSWhileNoTopicPartitions = 100
-
-  /** What's the next offset for a particular partition? **/
-  val nextOffsets:concurrent.Map[TopicAndPartition, Long] = new ConcurrentHashMap[TopicAndPartition, Long]().asScala
-
-  /** Block on the first call to get message if the fetcher has not yet returned its initial results **/
-  // TODO: It should be sufficient to just use the count down latch and await on it for each of the calls, but
-  // VisualVM was showing the consumer thread spending all its time in the await method rather than returning
-  // immediately, even though the process was proceeding normally.  Hence the extra boolean.  Should be investigated.
-  val firstCallBarrier = new CountDownLatch(1)
-  var firstCall = true
-
-  var simpleConsumer = createSimpleConsumer()
-
-  metrics.registerBrokerProxy(host, port)
-
-  def createSimpleConsumer() = {
-    val hostString = "%s:%d" format (host, port)
-    info("Creating new SimpleConsumer for host %s for system %s" format (hostString, system))
-
-    val sc = new DefaultFetchSimpleConsumer(host, port, timeout, bufferSize, clientID, fetchSize, consumerMinSize, consumerMaxWait)
-    sc
-  }
-
-  def addTopicPartition(tp: TopicAndPartition, nextOffset: Option[String]) = {
-    debug("Adding new topic and partition %s to queue for %s" format (tp, host))
-
-    if (nextOffsets.asJava.containsKey(tp)) {
-      toss("Already consuming TopicPartition %s" format tp)
-    }
-
-    val offset = if (nextOffset.isDefined && offsetGetter.isValidOffset(simpleConsumer, tp, nextOffset.get)) {
-      nextOffset
-        .get
-        .toLong
-    } else {
-      warn("It appears that we received an invalid or empty offset %s for %s. Attempting to use Kafka's auto.offset.reset setting. This can result in data loss if processing continues." format (nextOffset, tp))
-
-      offsetGetter.getResetOffset(simpleConsumer, tp)
-    }
-
-    debug("Got offset %s for new topic and partition %s." format (offset, tp))
-
-    nextOffsets += tp -> offset
-
-    metrics.topicPartitions.get((host, port)).set(nextOffsets.size)
-  }
-
-  def removeTopicPartition(tp: TopicAndPartition) = {
-    if (nextOffsets.asJava.containsKey(tp)) {
-      val offset = nextOffsets.remove(tp)
-      metrics.topicPartitions.get((host, port)).set(nextOffsets.size)
-      debug("Removed %s" format tp)
-      offset
-    } else {
-      warn("Asked to remove topic and partition %s, but not in map (keys = %s)" format (tp, nextOffsets.keys.mkString(",")))
-      None
-    }
-  }
-
-  val thread = new Thread(new Runnable {
-    def run {
-      var reconnect = false
-
-      try {
-        (new ExponentialSleepStrategy).run(
-          loop => {
-            if (reconnect) {
-              metrics.reconnects.get((host, port)).inc
-              simpleConsumer.close()
-              simpleConsumer = createSimpleConsumer()
-            }
-
-            while (!Thread.currentThread.isInterrupted) {
-              messageSink.refreshDropped
-              if (nextOffsets.size == 0) {
-                debug("No TopicPartitions to fetch. Sleeping.")
-                Thread.sleep(sleepMSWhileNoTopicPartitions)
-              } else {
-                fetchMessages
-
-                // If we got here, fetchMessages didn't throw an exception, i.e. it was successful.
-                // In that case, reset the loop delay, so that the next time an error occurs,
-                // we start with a short retry delay.
-                loop.reset
-              }
-            }
-          },
-
-          (exception, loop) => {
-            warn("Restarting consumer due to %s. Releasing ownership of all partitions, and restarting consumer. Turn on debugging to get a full stack trace." format exception)
-            debug("Exception detail:", exception)
-            abdicateAll
-            reconnect = true
-          })
-      } catch {
-        case e: InterruptedException       => info("Got interrupt exception in broker proxy thread.")
-        case e: ClosedByInterruptException => info("Got closed by interrupt exception in broker proxy thread.")
-        case e: OutOfMemoryError           => throw new SamzaException("Got out of memory error in broker proxy thread.")
-        case e: StackOverflowError         => throw new SamzaException("Got stack overflow error in broker proxy thread.")
-      }
-
-      if (Thread.currentThread.isInterrupted) info("Shutting down due to interrupt.")
-    }
-  }, "BrokerProxy thread pointed at %s:%d for client %s" format (host, port, clientID))
-
-  private def fetchMessages(): Unit = {
-    val topicAndPartitionsToFetch = nextOffsets.filterKeys(messageSink.needsMoreMessages(_)).toList
-
-    if (topicAndPartitionsToFetch.size > 0) {
-      metrics.brokerReads.get((host, port)).inc
-      val response: FetchResponse = simpleConsumer.defaultFetch(topicAndPartitionsToFetch: _*)
-      firstCall = false
-      firstCallBarrier.countDown()
-
-      // Split response into errors and non errors, processing the errors first
-      val (nonErrorResponses, errorResponses) = response.data.toSet.partition(_._2.error.code() == ErrorMapping.NoError)
-
-      handleErrors(errorResponses, response)
-
-      nonErrorResponses.foreach { case (tp, data) => moveMessagesToTheirQueue(tp, data) }
-    } else {
-      refreshLatencyMetrics
-
-      debug("No topic/partitions need to be fetched for %s:%s right now. Sleeping %sms." format (host, port, sleepMSWhileNoTopicPartitions))
-
-      metrics.brokerSkippedFetchRequests.get((host, port)).inc
-
-      Thread.sleep(sleepMSWhileNoTopicPartitions)
-    }
-  }
-
-  /**
-   * Releases ownership for a single TopicAndPartition. The
-   * KafkaSystemConsumer will try and find a new broker for the
-   * TopicAndPartition.
-   */
-  def abdicate(tp: TopicAndPartition) = removeTopicPartition(tp) match {
-    // Need to be mindful of a tp that was removed by another thread
-    case Some(offset) => messageSink.abdicate(tp, offset)
-    case None => warn("Tried to abdicate for topic partition not in map. Removed in interim?")
-  }
-
-  /**
-   * Releases all TopicAndPartition ownership for this BrokerProxy thread. The
-   * KafkaSystemConsumer will try and find a new broker for the
-   * TopicAndPartition.
-   */
-  def abdicateAll {
-    info("Abdicating all topic partitions.")
-    val immutableNextOffsetsCopy = nextOffsets.toMap
-    immutableNextOffsetsCopy.keySet.foreach(abdicate(_))
-  }
-
-  def handleErrors(errorResponses: Set[(TopicAndPartition, FetchResponsePartitionData)], response: FetchResponse) = {
-    // FetchResponse should really return Option and a list of the errors so we don't have to find them ourselves
-    case class Error(tp: TopicAndPartition, code: Short, exception: Exception)
-
-    // Now subdivide the errors into three types: non-recoverable, not leader (== abdicate) and offset out of range (== get new offset)
-
-    // Convert FetchResponse into easier-to-work-with Errors
-    val errors = for (
-      (topicAndPartition, responseData) <- errorResponses;
-      error <- Option(response.error(topicAndPartition.topic, topicAndPartition.partition)) // Scala's being cranky about referring to error.getKey values...
-    ) yield new Error(topicAndPartition, error.code(), error.exception())
-
-    val (notLeaderOrUnknownTopic, otherErrors) = errors.partition { case (e) => e.code == ErrorMapping.NotLeaderForPartitionCode || e.code == ErrorMapping.UnknownTopicOrPartitionCode }
-    val (offsetOutOfRangeErrors, remainingErrors) = otherErrors.partition(_.code == ErrorMapping.OffsetOutOfRangeCode)
-
-    // Can recover from two types of errors: not leader (go find the new leader) and offset out of range (go get the new offset)
-    // However, we want to bail as quickly as possible if there are non recoverable errors so that the state of the other
-    // topic-partitions remains the same.  That way, when we've rebuilt the simple consumer, we can come around and
-    // handle the recoverable errors.
-    remainingErrors.foreach(e => {
-      warn("Got non-recoverable error codes during multifetch. Throwing an exception to trigger reconnect. Errors: %s" format remainingErrors.mkString(","))
-      KafkaUtil.maybeThrowException(e.exception) })
-
-    notLeaderOrUnknownTopic.foreach(e => {
-      warn("Received (UnknownTopicOr|NotLeaderFor)Partition exception %s for %s. Abdicating" format(e.code, e.tp))
-      abdicate(e.tp)
-    })
-
-    offsetOutOfRangeErrors.foreach(e => {
-      warn("Received OffsetOutOfRange exception for %s. Current offset = %s" format (e.tp, nextOffsets.getOrElse(e.tp, "not found in map, likely removed in the interim")))
-
-      try {
-        val newOffset = offsetGetter.getResetOffset(simpleConsumer, e.tp)
-        // Put the new offset into the map (if the tp still exists).  Will catch it on the next go-around
-        nextOffsets.replace(e.tp, newOffset)
-      } catch {
-        // UnknownTopic or NotLeader are routine events and handled via abdication.  All others, bail.
-        case _ @ (_:UnknownTopicOrPartitionException | _: NotLeaderForPartitionException) => warn("Received (UnknownTopicOr|NotLeaderFor)Partition exception %s for %s. Abdicating" format(e.code, e.tp))
-                                                                                             abdicate(e.tp)
-      }
-    })
-  }
-
-  def moveMessagesToTheirQueue(tp: TopicAndPartition, data: FetchResponsePartitionData) = {
-    val messageSet: MessageSet = data.messages
-    var nextOffset = nextOffsets(tp)
-
-    messageSink.setIsAtHighWatermark(tp, data.hw == 0 || data.hw == nextOffset)
-    require(messageSet != null)
-    for (message <- messageSet.iterator) {
-      messageSink.addMessage(tp, message, data.hw) // TODO: Verify this is correct
-
-      nextOffset = message.nextOffset
-
-      val bytesSize = message.message.payloadSize + message.message.keySize
-      metrics.reads.get(tp).inc
-      metrics.bytesRead.get(tp).inc(bytesSize)
-      metrics.brokerBytesRead.get((host, port)).inc(bytesSize)
-      metrics.offsets.get(tp).set(nextOffset)
-    }
-
-    nextOffsets.replace(tp, nextOffset) // use replace rather than put in case this tp was removed while we were fetching.
-
-    // Update high water mark
-    val hw = data.hw
-    if (hw >= 0) {
-      metrics.highWatermark.get(tp).set(hw)
-      metrics.lag.get(tp).set(hw - nextOffset)
-    } else {
-      debug("Got a high water mark less than 0 (%d) for %s, so skipping." format (hw, tp))
-    }
-  }
-  override def toString() = "BrokerProxy for %s:%d" format (host, port)
-
-  def start {
-    if (!thread.isAlive) {
-      info("Starting " + toString)
-      thread.setDaemon(true)
-      thread.setName("Samza BrokerProxy " + thread.getName)
-      thread.setUncaughtExceptionHandler(new UncaughtExceptionHandler {
-        override def uncaughtException(t: Thread, e: Throwable) = error("Uncaught exception in broker proxy:", e)
-      })
-      thread.start
-    } else {
-      debug("Tried to start an already started broker proxy (%s). Ignoring." format toString)
-    }
-  }
-
-  def stop {
-    info("Shutting down " + toString)
-
-    if (simpleConsumer != null) {
-      info("closing simple consumer...")
-      simpleConsumer.close
-    }
-
-    thread.interrupt
-    thread.join
-  }
-
-  private def refreshLatencyMetrics {
-    nextOffsets.foreach{
-      case (topicAndPartition, offset) => {
-        val latestOffset = simpleConsumer.earliestOrLatestOffset(topicAndPartition, -1, Request.OrdinaryConsumerId)
-        trace("latest offset of %s is %s" format (topicAndPartition, latestOffset))
-        if (latestOffset >= 0) {
-          // only update the registered topicAndpartitions
-          if(metrics.highWatermark.containsKey(topicAndPartition)) {
-            metrics.highWatermark.get(topicAndPartition).set(latestOffset)
-          }
-          if(metrics.lag.containsKey(topicAndPartition)) {
-            metrics.lag.get(topicAndPartition).set(latestOffset - offset)
-          }
-        }
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/003ad106/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
new file mode 100644
index 0000000..04071c1
--- /dev/null
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
@@ -0,0 +1,423 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.samza.system.kafka;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import kafka.common.TopicAndPartition;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemStreamPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class contains a separate thread that reads messages from kafka and puts them  into the BlockingEnvelopeMap
+ * through KafkaSystemConsumer.KafkaConsumerMessageSink object.
+ * This class is not thread safe. There will be only one instance of this class per KafkaSystemConsumer object.
+ * We still need some synchronization around kafkaConsumer. See pollConsumer() method for details.
+ */
+class KafkaConsumerProxy<K, V> {
+  private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerProxy.class);
+
+  private static final int SLEEP_MS_WHILE_NO_TOPIC_PARTITION = 100;
+
+  final Thread consumerPollThread;
+  private final Consumer<K, V> kafkaConsumer;
+  private final KafkaSystemConsumer.KafkaConsumerMessageSink sink;
+  private final KafkaSystemConsumerMetrics kafkaConsumerMetrics;
+  private final String metricName;
+  private final String systemName;
+  private final String clientId;
+  private final Map<TopicPartition, SystemStreamPartition> topicPartitionToSSP = new HashMap<>();
+  private final Map<SystemStreamPartition, MetricName> perPartitionMetrics = new HashMap<>();
+  // list of all the SSPs we poll from, with their next(most recently read + 1) offsets correspondingly.
+  private final Map<SystemStreamPartition, Long> nextOffsets = new ConcurrentHashMap<>();
+  // lags behind the high water mark, as reported by the Kafka consumer.
+  private final Map<SystemStreamPartition, Long> latestLags = new HashMap<>();
+
+  private volatile boolean isRunning = false;
+  private volatile Throwable failureCause = null;
+  private final CountDownLatch consumerPollThreadStartLatch = new CountDownLatch(1);
+
+  KafkaConsumerProxy(Consumer<K, V> kafkaConsumer, String systemName, String clientId,
+      KafkaSystemConsumer.KafkaConsumerMessageSink messageSink, KafkaSystemConsumerMetrics samzaConsumerMetrics,
+      String metricName) {
+
+    this.kafkaConsumer = kafkaConsumer;
+    this.systemName = systemName;
+    this.sink = messageSink;
+    this.kafkaConsumerMetrics = samzaConsumerMetrics;
+    this.metricName = metricName;
+    this.clientId = clientId;
+
+    this.kafkaConsumerMetrics.registerClientProxy(metricName);
+
+    consumerPollThread = new Thread(createProxyThreadRunnable());
+    consumerPollThread.setDaemon(true);
+    consumerPollThread.setName(
+        "Samza KafkaConsumerProxy Poll " + consumerPollThread.getName() + " - " + systemName);
+  }
+
+  /**
+   * Add new partition to the list of polled partitions.
+   * Bust only be called before {@link KafkaConsumerProxy#start} is called..
+   */
+  public void addTopicPartition(SystemStreamPartition ssp, long nextOffset) {
+    LOG.info(String.format("Adding new topicPartition %s with offset %s to queue for consumer %s", ssp, nextOffset,
+        this));
+    topicPartitionToSSP.put(KafkaSystemConsumer.toTopicPartition(ssp), ssp); //registered SSPs
+
+    // this is already vetted offset so there is no need to validate it
+    nextOffsets.put(ssp, nextOffset);
+
+    kafkaConsumerMetrics.setNumTopicPartitions(metricName, nextOffsets.size());
+  }
+
+  /**
+   * Stop this KafkaConsumerProxy and wait for at most {@code timeoutMs}.
+   * @param timeoutMs maximum time to wait to stop this KafkaConsumerProxy
+   */
+  public void stop(long timeoutMs) {
+    LOG.info("Shutting down KafkaConsumerProxy poll thread {} for {}", consumerPollThread.getName(), this);
+
+    isRunning = false;
+    try {
+      consumerPollThread.join(timeoutMs/2);
+      // join() may timeout
+      // in this case we should interrupt it and wait again
+      if (consumerPollThread.isAlive()) {
+        consumerPollThread.interrupt();
+        consumerPollThread.join(timeoutMs/2);
+      }
+    } catch (InterruptedException e) {
+      LOG.warn("Join in KafkaConsumerProxy has failed", e);
+      consumerPollThread.interrupt();
+    }
+  }
+
+  public void start() {
+    if (!consumerPollThread.isAlive()) {
+      LOG.info("Starting KafkaConsumerProxy polling thread for " + this.toString());
+
+      consumerPollThread.start();
+
+      // we need to wait until the thread starts
+      while (!isRunning && failureCause == null) {
+        try {
+          consumerPollThreadStartLatch.await(3000, TimeUnit.MILLISECONDS);
+        } catch (InterruptedException e) {
+          LOG.info("Ignoring InterruptedException while waiting for consumer poll thread to start.", e);
+        }
+      }
+    } else {
+      LOG.warn("Tried to start an already started KafkaConsumerProxy (%s). Ignoring.", this.toString());
+    }
+
+    if (topicPartitionToSSP.size() == 0) {
+      String msg = String.format("Cannot start KafkaConsumerProxy without any registered TopicPartitions for %s", systemName);
+      LOG.error(msg);
+      throw new SamzaException(msg);
+    }
+  }
+
+  boolean isRunning() {
+    return isRunning;
+  }
+
+  Throwable getFailureCause() {
+    return failureCause;
+  }
+
+  private void initializeLags() {
+    // This is expensive, so only do it once at the beginning. After the first poll, we can rely on metrics for lag.
+    Map<TopicPartition, Long> endOffsets = kafkaConsumer.endOffsets(topicPartitionToSSP.keySet());
+    endOffsets.forEach((tp, offset) -> {
+      SystemStreamPartition ssp = topicPartitionToSSP.get(tp);
+      long startingOffset = nextOffsets.get(ssp);
+      // End offsets are the offset of the newest message + 1
+      // If the message we are about to consume is < end offset, we are starting with a lag.
+      long initialLag = endOffsets.get(tp) - startingOffset;
+
+      LOG.info("Initial lag for SSP {} is {} (end={}, startOffset={})", ssp, initialLag, endOffsets.get(tp), startingOffset);
+      latestLags.put(ssp, initialLag);
+      sink.setIsAtHighWatermark(ssp, initialLag == 0);
+    });
+
+    // initialize lag metrics
+    refreshLagMetrics();
+  }
+
+  // creates a separate thread for getting the messages.
+  private Runnable createProxyThreadRunnable() {
+    Runnable runnable = () -> {
+      isRunning = true;
+
+      try {
+        consumerPollThreadStartLatch.countDown();
+        LOG.info("Starting consumer poll thread {} for system {}", consumerPollThread.getName(), systemName);
+        initializeLags();
+        while (isRunning) {
+          fetchMessages();
+        }
+      } catch (Throwable throwable) {
+        LOG.error(String.format("Error in KafkaConsumerProxy poll thread for system: %s.", systemName), throwable);
+        // KafkaSystemConsumer uses the failureCause to propagate the throwable to the container
+        failureCause = throwable;
+        isRunning = false;
+      }
+
+      if (!isRunning) {
+        LOG.info("KafkaConsumerProxy for system {} has stopped.", systemName);
+      }
+    };
+
+    return runnable;
+  }
+
+  private void fetchMessages() {
+    Set<SystemStreamPartition> sspsToFetch = new HashSet<>();
+    for (SystemStreamPartition ssp : nextOffsets.keySet()) {
+      if (sink.needsMoreMessages(ssp)) {
+        sspsToFetch.add(ssp);
+      }
+    }
+    LOG.debug("pollConsumer for {} SSPs: {}", sspsToFetch.size(), sspsToFetch);
+    if (!sspsToFetch.isEmpty()) {
+      kafkaConsumerMetrics.incClientReads(metricName);
+
+      Map<SystemStreamPartition, List<IncomingMessageEnvelope>> response;
+
+      response = pollConsumer(sspsToFetch, 500L);
+
+      // move the responses into the queue
+      for (Map.Entry<SystemStreamPartition, List<IncomingMessageEnvelope>> e : response.entrySet()) {
+        List<IncomingMessageEnvelope> envelopes = e.getValue();
+        if (envelopes != null) {
+          moveMessagesToTheirQueue(e.getKey(), envelopes);
+        }
+      }
+
+      populateCurrentLags(sspsToFetch); // find current lags for for each SSP
+    } else { // nothing to read
+
+      LOG.debug("No topic/partitions need to be fetched for system {} right now. Sleeping {}ms.", systemName,
+          SLEEP_MS_WHILE_NO_TOPIC_PARTITION);
+
+      kafkaConsumerMetrics.incClientSkippedFetchRequests(metricName);
+
+      try {
+        Thread.sleep(SLEEP_MS_WHILE_NO_TOPIC_PARTITION);
+      } catch (InterruptedException e) {
+        LOG.warn("Sleep in fetchMessages was interrupted");
+      }
+    }
+    refreshLagMetrics();
+  }
+
+  // the actual polling of the messages from kafka
+  private Map<SystemStreamPartition, List<IncomingMessageEnvelope>> pollConsumer(
+      Set<SystemStreamPartition> systemStreamPartitions, long timeoutMs) {
+
+    // Since we need to poll only from some subset of TopicPartitions (passed as the argument),
+    // we need to pause the rest.
+    List<TopicPartition> topicPartitionsToPause = new ArrayList<>();
+    List<TopicPartition> topicPartitionsToPoll = new ArrayList<>();
+
+    for (Map.Entry<TopicPartition, SystemStreamPartition> e : topicPartitionToSSP.entrySet()) {
+      TopicPartition tp = e.getKey();
+      SystemStreamPartition ssp = e.getValue();
+      if (systemStreamPartitions.contains(ssp)) {
+        topicPartitionsToPoll.add(tp);  // consume
+      } else {
+        topicPartitionsToPause.add(tp); // ignore
+      }
+    }
+
+    ConsumerRecords<K, V> records;
+    try {
+      // Synchronize, in case the consumer is used in some other thread (metadata or something else)
+      synchronized (kafkaConsumer) {
+        // Since we are not polling from ALL the subscribed topics, so we need to "change" the subscription temporarily
+        kafkaConsumer.pause(topicPartitionsToPause);
+        kafkaConsumer.resume(topicPartitionsToPoll);
+        records = kafkaConsumer.poll(timeoutMs);
+      }
+    } catch (Exception e) {
+      // we may get InvalidOffsetException | AuthorizationException | KafkaException exceptions,
+      // but we still just rethrow, and log it up the stack.
+      LOG.error("Caught a Kafka exception in pollConsumer for system " + systemName, e);
+      throw e;
+    }
+
+    return processResults(records);
+  }
+
+  private Map<SystemStreamPartition, List<IncomingMessageEnvelope>> processResults(ConsumerRecords<K, V> records) {
+    if (records == null) {
+      throw new SamzaException("Received null 'records' after polling consumer in KafkaConsumerProxy " + this);
+    }
+
+    Map<SystemStreamPartition, List<IncomingMessageEnvelope>> results = new HashMap<>(records.count());
+    // Parse the returned records and convert them into the IncomingMessageEnvelope.
+    for (ConsumerRecord<K, V> record : records) {
+      int partition = record.partition();
+      String topic = record.topic();
+      TopicPartition tp = new TopicPartition(topic, partition);
+
+      updateMetrics(record, tp);
+
+      SystemStreamPartition ssp = topicPartitionToSSP.get(tp);
+      List<IncomingMessageEnvelope> messages = results.get(ssp);
+      if (messages == null) {
+        messages = new ArrayList<>();
+        results.put(ssp, messages);
+      }
+
+      K key = record.key();
+      Object value = record.value();
+      IncomingMessageEnvelope imEnvelope =
+          new IncomingMessageEnvelope(ssp, String.valueOf(record.offset()), key, value, getRecordSize(record));
+      messages.add(imEnvelope);
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("# records per SSP:");
+      for (Map.Entry<SystemStreamPartition, List<IncomingMessageEnvelope>> e : results.entrySet()) {
+        List<IncomingMessageEnvelope> list = e.getValue();
+        LOG.debug(e.getKey() + " = " + ((list == null) ? 0 : list.size()));
+      }
+    }
+
+    return results;
+  }
+
+  private int getRecordSize(ConsumerRecord<K, V> r) {
+    int keySize = (r.key() == null) ? 0 : r.serializedKeySize();
+    return keySize + r.serializedValueSize();
+  }
+
+  private void updateMetrics(ConsumerRecord<K, V> r, TopicPartition tp) {
+    TopicAndPartition tap = KafkaSystemConsumer.toTopicAndPartition(tp);
+    SystemStreamPartition ssp = new SystemStreamPartition(systemName, tp.topic(), new Partition(tp.partition()));
+
+    Long lag = latestLags.get(ssp);
+    if (lag == null) {
+      throw new SamzaException("Unknown/unregistered ssp in latestLags. ssp=" + ssp + "; system=" + systemName);
+    }
+    long currentSSPLag = lag.longValue(); // lag between the current offset and the highwatermark
+    if (currentSSPLag < 0) {
+      return;
+    }
+
+    long recordOffset = r.offset();
+    long highWatermark = recordOffset + currentSSPLag; // derived value for the highwatermark
+
+    int size = getRecordSize(r);
+    kafkaConsumerMetrics.incReads(tap);
+    kafkaConsumerMetrics.incBytesReads(tap, size);
+    kafkaConsumerMetrics.setOffsets(tap, recordOffset);
+    kafkaConsumerMetrics.incClientBytesReads(metricName, size);
+    kafkaConsumerMetrics.setHighWatermarkValue(tap, highWatermark);
+  }
+
+  private void moveMessagesToTheirQueue(SystemStreamPartition ssp, List<IncomingMessageEnvelope> envelopes) {
+    long nextOffset = nextOffsets.get(ssp);
+
+    for (IncomingMessageEnvelope env : envelopes) {
+      sink.addMessage(ssp, env);  // move message to the BlockingEnvelopeMap's queue
+
+      LOG.trace("IncomingMessageEnvelope. got envelope with offset:{} for ssp={}", env.getOffset(), ssp);
+      nextOffset = Long.valueOf(env.getOffset()) + 1;
+    }
+
+    nextOffsets.put(ssp, nextOffset);
+  }
+
+  // The only way to figure out lag for the KafkaConsumer is to look at the metrics after each poll() call.
+  // One of the metrics (records-lag) shows how far behind the HighWatermark the consumer is.
+  // This method populates the lag information for each SSP into latestLags member variable.
+  private void populateCurrentLags(Set<SystemStreamPartition> ssps) {
+
+    Map<MetricName, ? extends Metric> consumerMetrics = kafkaConsumer.metrics();
+
+    // populate the MetricNames first time
+    if (perPartitionMetrics.isEmpty()) {
+      HashMap<String, String> tags = new HashMap<>();
+      tags.put("client-id", clientId); // this is required by the KafkaConsumer to get the metrics
+
+      for (SystemStreamPartition ssp : ssps) {
+        TopicPartition tp = KafkaSystemConsumer.toTopicPartition(ssp);
+        perPartitionMetrics.put(ssp, new MetricName(tp + ".records-lag", "consumer-fetch-manager-metrics", "", tags));
+      }
+    }
+
+    for (SystemStreamPartition ssp : ssps) {
+      MetricName mn = perPartitionMetrics.get(ssp);
+      Metric currentLagMetric = consumerMetrics.get(mn);
+
+      // High watermark is fixed to be the offset of last available message,
+      // so the lag is now at least 0, which is the same as Samza's definition.
+      // If the lag is not 0, then isAtHead is not true, and kafkaClient keeps polling.
+      long currentLag = (currentLagMetric != null) ? (long) currentLagMetric.value() : -1L;
+      latestLags.put(ssp, currentLag);
+
+      // calls the setIsAtHead for the BlockingEnvelopeMap
+      sink.setIsAtHighWatermark(ssp, currentLag == 0);
+    }
+  }
+
+  private void refreshLagMetrics() {
+    for (Map.Entry<SystemStreamPartition, Long> e : nextOffsets.entrySet()) {
+      SystemStreamPartition ssp = e.getKey();
+      Long offset = e.getValue();
+      TopicAndPartition tp = new TopicAndPartition(ssp.getStream(), ssp.getPartition().getPartitionId());
+      Long lag = latestLags.get(ssp);
+      LOG.trace("Latest offset of {} is  {}; lag = {}", ssp, offset, lag);
+      if (lag != null && offset != null && lag >= 0) {
+        long streamEndOffset = offset.longValue() + lag.longValue();
+        // update the metrics
+        kafkaConsumerMetrics.setHighWatermarkValue(tp, streamEndOffset);
+        kafkaConsumerMetrics.setLagValue(tp, lag.longValue());
+      }
+    }
+  }
+
+   @Override
+  public String toString() {
+    return String.format("consumerProxy-%s-%s", systemName, clientId);
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/samza/blob/003ad106/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.java b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.java
new file mode 100644
index 0000000..10ce274
--- /dev/null
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.java
@@ -0,0 +1,371 @@
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.samza.system.kafka;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import kafka.common.TopicAndPartition;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.KafkaConfig;
+import org.apache.samza.config.KafkaConsumerConfig;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemConsumer;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.BlockingEnvelopeMap;
+import org.apache.samza.util.Clock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+
+
+public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements SystemConsumer {
+
+  private static final Logger LOG = LoggerFactory.getLogger(KafkaSystemConsumer.class);
+
+  private static final long FETCH_THRESHOLD = 50000;
+  private static final long FETCH_THRESHOLD_BYTES = -1L;
+
+  private final Consumer<K, V> kafkaConsumer;
+  private final String systemName;
+  private final String clientId;
+  private final AtomicBoolean stopped = new AtomicBoolean(false);
+  private final AtomicBoolean started = new AtomicBoolean(false);
+  private final Config config;
+  private final boolean fetchThresholdBytesEnabled;
+  private final KafkaSystemConsumerMetrics metrics;
+
+  // This sink is used to transfer the messages from the proxy/consumer to the BlockingEnvelopeMap.
+  final KafkaConsumerMessageSink messageSink;
+
+  // This proxy contains a separate thread, which reads kafka messages (with consumer.poll()) and populates
+  // BlockingEnvelopMap's buffers.
+  final private KafkaConsumerProxy proxy;
+
+  // keep registration data until the start - mapping between registered SSPs and topicPartitions, and their offsets
+  final Map<TopicPartition, String> topicPartitionsToOffset = new HashMap<>();
+  final Map<TopicPartition, SystemStreamPartition> topicPartitionsToSSP = new HashMap<>();
+
+  long perPartitionFetchThreshold;
+  long perPartitionFetchThresholdBytes;
+
+  /**
+   * Create a KafkaSystemConsumer for the provided {@code systemName}
+   * @param systemName system name for which we create the consumer
+   * @param config application config
+   * @param metrics metrics for this KafkaSystemConsumer
+   * @param clock system clock
+   */
+  public KafkaSystemConsumer(Consumer<K, V> kafkaConsumer, String systemName, Config config, String clientId,
+      KafkaSystemConsumerMetrics metrics, Clock clock) {
+
+    super(metrics.registry(), clock, metrics.getClass().getName());
+
+    this.kafkaConsumer = kafkaConsumer;
+    this.clientId = clientId;
+    this.systemName = systemName;
+    this.config = config;
+    this.metrics = metrics;
+
+    fetchThresholdBytesEnabled = new KafkaConfig(config).isConsumerFetchThresholdBytesEnabled(systemName);
+
+    // create a sink for passing the messages between the proxy and the consumer
+    messageSink = new KafkaConsumerMessageSink();
+
+    // Create the proxy to do the actual message reading.
+    String metricName = String.format("%s", systemName);
+    proxy = new KafkaConsumerProxy(kafkaConsumer, systemName, clientId, messageSink, metrics, metricName);
+    LOG.info("{}: Created KafkaConsumerProxy {} ", this, proxy);
+  }
+
+  /**
+   * Create internal kafka consumer object, which will be used in the Proxy.
+   * @param systemName system name for which we create the consumer
+   * @param clientId client id to use int the kafka client
+   * @param config config
+   * @return kafka consumer object
+   */
+  public static KafkaConsumer<byte[], byte[]> getKafkaConsumerImpl(String systemName, String clientId, Config config) {
+
+    // extract kafka client configs
+    KafkaConsumerConfig consumerConfig = KafkaConsumerConfig.getKafkaSystemConsumerConfig(config, systemName, clientId);
+
+    LOG.info("{}: KafkaClient properties {}", systemName, consumerConfig);
+
+    return new KafkaConsumer(consumerConfig);
+  }
+
+  @Override
+  public void start() {
+    if (!started.compareAndSet(false, true)) {
+      LOG.warn("{}: Attempting to start the consumer for the second (or more) time.", this);
+      return;
+    }
+    if (stopped.get()) {
+      LOG.error("{}: Attempting to start a stopped consumer", this);
+      return;
+    }
+    // initialize the subscriptions for all the registered TopicPartitions
+    startSubscription();
+    // needs to be called after all the registrations are completed
+    setFetchThresholds();
+
+    startConsumer();
+    LOG.info("{}: Consumer started", this);
+  }
+
+  private void startSubscription() {
+    //subscribe to all the registered TopicPartitions
+    LOG.info("{}: Consumer subscribes to {}", this, topicPartitionsToSSP.keySet());
+    try {
+      synchronized (kafkaConsumer) {
+        // we are using assign (and not subscribe), so we need to specify both topic and partition
+        kafkaConsumer.assign(topicPartitionsToSSP.keySet());
+      }
+    } catch (Exception e) {
+      throw new SamzaException("Consumer subscription failed for " + this, e);
+    }
+  }
+
+  /**
+   * Set the offsets to start from.
+   * Register the TopicPartitions with the proxy.
+   * Start the proxy.
+   */
+  void startConsumer() {
+    // set the offset for each TopicPartition
+    if (topicPartitionsToOffset.size() <= 0) {
+      LOG.error ("{}: Consumer is not subscribed to any SSPs", this);
+    }
+
+    topicPartitionsToOffset.forEach((tp, startingOffsetString) -> {
+      long startingOffset = Long.valueOf(startingOffsetString);
+
+      try {
+        synchronized (kafkaConsumer) {
+          kafkaConsumer.seek(tp, startingOffset); // this value should already be the 'upcoming' value
+        }
+      } catch (Exception e) {
+        // all recoverable execptions are handled by the client.
+        // if we get here there is nothing left to do but bail out.
+        String msg =
+            String.format("%s: Got Exception while seeking to %s for partition %s", this, startingOffsetString, tp);
+        LOG.error(msg, e);
+        throw new SamzaException(msg, e);
+      }
+
+      LOG.info("{}: Changing consumer's starting offset for tp = %s to %s", this, tp, startingOffsetString);
+
+      // add the partition to the proxy
+      proxy.addTopicPartition(topicPartitionsToSSP.get(tp), startingOffset);
+    });
+
+    // start the proxy thread
+    if (proxy != null && !proxy.isRunning()) {
+      LOG.info("{}: Starting proxy {}", this, proxy);
+      proxy.start();
+    }
+  }
+
+  private void setFetchThresholds() {
+    // get the thresholds, and set defaults if not defined.
+    KafkaConfig kafkaConfig = new KafkaConfig(config);
+
+    Option<String> fetchThresholdOption = kafkaConfig.getConsumerFetchThreshold(systemName);
+    long fetchThreshold = FETCH_THRESHOLD;
+    if (fetchThresholdOption.isDefined()) {
+      fetchThreshold = Long.valueOf(fetchThresholdOption.get());
+    }
+
+    Option<String> fetchThresholdBytesOption = kafkaConfig.getConsumerFetchThresholdBytes(systemName);
+    long fetchThresholdBytes = FETCH_THRESHOLD_BYTES;
+    if (fetchThresholdBytesOption.isDefined()) {
+      fetchThresholdBytes = Long.valueOf(fetchThresholdBytesOption.get());
+    }
+
+    int numPartitions = topicPartitionsToSSP.size();
+    if (numPartitions != topicPartitionsToOffset.size()) {
+      throw new SamzaException("topicPartitionsToSSP.size() doesn't match topicPartitionsToOffset.size()");
+    }
+
+
+    if (numPartitions > 0) {
+      perPartitionFetchThreshold = fetchThreshold / numPartitions;
+      if (fetchThresholdBytesEnabled) {
+        // currently this feature cannot be enabled, because we do not have the size of the messages available.
+        // messages get double buffered, hence divide by 2
+        perPartitionFetchThresholdBytes = (fetchThresholdBytes / 2) / numPartitions;
+      }
+    }
+    LOG.info("{}: fetchThresholdBytes = {}; fetchThreshold={}; numPartitions={}, perPartitionFetchThreshold={}, perPartitionFetchThresholdBytes(0 if disabled)={}",
+        this, fetchThresholdBytes, fetchThreshold, numPartitions, perPartitionFetchThreshold, perPartitionFetchThresholdBytes);
+  }
+
+  @Override
+  public void stop() {
+    if (!stopped.compareAndSet(false, true)) {
+      LOG.warn("{}: Attempting to stop stopped consumer.", this);
+      return;
+    }
+
+    LOG.info("{}: Stopping Samza kafkaConsumer ", this);
+
+    // stop the proxy (with 1 minute timeout)
+    if (proxy != null) {
+      LOG.info("{}: Stopping proxy {}", this, proxy);
+      proxy.stop(TimeUnit.SECONDS.toMillis(60));
+    }
+
+    try {
+      synchronized (kafkaConsumer) {
+        LOG.info("{}: Closing kafkaSystemConsumer {}", this, kafkaConsumer);
+        kafkaConsumer.close();
+      }
+    } catch (Exception e) {
+      LOG.warn("{}: Failed to stop KafkaSystemConsumer.", this, e);
+    }
+  }
+
+  /**
+   * record the ssp and the offset. Do not submit it to the consumer yet.
+   * @param systemStreamPartition ssp to register
+   * @param offset offset to register with
+   */
+  @Override
+  public void register(SystemStreamPartition systemStreamPartition, String offset) {
+    if (started.get()) {
+      String msg = String.format("%s: Trying to register partition after consumer has been started. ssp=%s", this,
+          systemStreamPartition);
+      throw new SamzaException(msg);
+    }
+
+    if (!systemStreamPartition.getSystem().equals(systemName)) {
+      LOG.warn("{}: ignoring SSP {}, because this consumer's system doesn't match.", this, systemStreamPartition);
+      return;
+    }
+    LOG.info("{}: Registering ssp = {} with offset {}", this, systemStreamPartition, offset);
+
+    super.register(systemStreamPartition, offset);
+
+    TopicPartition tp = toTopicPartition(systemStreamPartition);
+
+    topicPartitionsToSSP.put(tp, systemStreamPartition);
+
+    String existingOffset = topicPartitionsToOffset.get(tp);
+    // register the older (of the two) offset in the consumer, to guarantee we do not miss any messages.
+    if (existingOffset == null || compareOffsets(existingOffset, offset) > 0) {
+      topicPartitionsToOffset.put(tp, offset);
+    }
+
+    metrics.registerTopicAndPartition(toTopicAndPartition(tp));
+  }
+
+  /**
+   * Compare two String offsets.
+   * Note. There is a method in KafkaSystemAdmin that does that, but that would require instantiation of systemadmin for each consumer.
+   * @return see {@link Long#compareTo(Long)}
+   */
+  private static int compareOffsets(String offset1, String offset2) {
+    return Long.valueOf(offset1).compareTo(Long.valueOf(offset2));
+  }
+
+  @Override
+  public String toString() {
+    return String.format("%s:%s", systemName, clientId);
+  }
+
+  @Override
+  public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(
+      Set<SystemStreamPartition> systemStreamPartitions, long timeout) throws InterruptedException {
+
+    // check if the proxy is running
+    if (!proxy.isRunning()) {
+      stop();
+      String message = String.format("%s: KafkaConsumerProxy has stopped.", this);
+      throw new SamzaException(message, proxy.getFailureCause());
+    }
+
+    return super.poll(systemStreamPartitions, timeout);
+  }
+
+  /**
+   * convert from TopicPartition to TopicAndPartition
+   */
+  public static TopicAndPartition toTopicAndPartition(TopicPartition tp) {
+    return new TopicAndPartition(tp.topic(), tp.partition());
+  }
+
+  /**
+   * convert to TopicPartition from SystemStreamPartition
+   */
+  public static TopicPartition toTopicPartition(SystemStreamPartition ssp) {
+    return new TopicPartition(ssp.getStream(), ssp.getPartition().getPartitionId());
+  }
+
+  /**
+   * return system name for this consumer
+   * @return system name
+   */
+  public String getSystemName() {
+    return systemName;
+  }
+
+  public class KafkaConsumerMessageSink {
+
+    public void setIsAtHighWatermark(SystemStreamPartition ssp, boolean isAtHighWatermark) {
+      setIsAtHead(ssp, isAtHighWatermark);
+    }
+
+    boolean needsMoreMessages(SystemStreamPartition ssp) {
+      LOG.debug("{}: needsMoreMessages from following SSP: {}. fetchLimitByBytes enabled={}; messagesSizeInQueue={};"
+              + "(limit={}); messagesNumInQueue={}(limit={};", this, ssp, fetchThresholdBytesEnabled,
+          getMessagesSizeInQueue(ssp), perPartitionFetchThresholdBytes, getNumMessagesInQueue(ssp),
+          perPartitionFetchThreshold);
+
+      if (fetchThresholdBytesEnabled) {
+        return getMessagesSizeInQueue(ssp) < perPartitionFetchThresholdBytes;
+      } else {
+        return getNumMessagesInQueue(ssp) < perPartitionFetchThreshold;
+      }
+    }
+
+    void addMessage(SystemStreamPartition ssp, IncomingMessageEnvelope envelope) {
+      LOG.trace("{}: Incoming message ssp = {}: envelope = {}.", this, ssp, envelope);
+
+      try {
+        put(ssp, envelope);
+      } catch (InterruptedException e) {
+        throw new SamzaException(
+            String.format("%s: Consumer was interrupted while trying to add message with offset %s for ssp %s", this,
+                envelope.getOffset(), ssp));
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/003ad106/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
deleted file mode 100644
index fd84c4a..0000000
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
+++ /dev/null
@@ -1,309 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.system.kafka
-
-import kafka.common.TopicAndPartition
-import org.apache.samza.util.Logging
-import kafka.message.Message
-import kafka.message.MessageAndOffset
-import org.apache.samza.Partition
-import org.apache.kafka.common.utils.Utils
-import org.apache.samza.util.Clock
-import kafka.serializer.DefaultDecoder
-import kafka.serializer.Decoder
-import org.apache.samza.util.BlockingEnvelopeMap
-import org.apache.samza.system.SystemStreamPartition
-import org.apache.samza.system.IncomingMessageEnvelope
-import kafka.consumer.ConsumerConfig
-import org.apache.samza.util.TopicMetadataStore
-import kafka.api.PartitionMetadata
-import kafka.api.TopicMetadata
-import org.apache.samza.util.ExponentialSleepStrategy
-import java.util.concurrent.ConcurrentHashMap
-import scala.collection.JavaConverters._
-import org.apache.samza.system.SystemAdmin
-
-object KafkaSystemConsumer {
-
-  // Approximate additional shallow heap overhead per message in addition to the raw bytes
-  // received from Kafka  4 + 64 + 4 + 4 + 4 = 80 bytes overhead.
-  // As this overhead is a moving target, and not very large
-  // compared to the message size its being ignore in the computation for now.
-  val MESSAGE_SIZE_OVERHEAD =  4 + 64 + 4 + 4 + 4;
-
-  def toTopicAndPartition(systemStreamPartition: SystemStreamPartition) = {
-    val topic = systemStreamPartition.getStream
-    val partitionId = systemStreamPartition.getPartition.getPartitionId
-    TopicAndPartition(topic, partitionId)
-  }
-}
-
-/**
- *  Maintain a cache of BrokerProxies, returning the appropriate one for the
- *  requested topic and partition.
- */
-private[kafka] class KafkaSystemConsumer(
-  systemName: String,
-  systemAdmin: SystemAdmin,
-  metrics: KafkaSystemConsumerMetrics,
-  metadataStore: TopicMetadataStore,
-  clientId: String,
-  timeout: Int = ConsumerConfig.ConsumerTimeoutMs,
-  bufferSize: Int = ConsumerConfig.SocketBufferSize,
-  fetchSize: StreamFetchSizes = new StreamFetchSizes,
-  consumerMinSize: Int = ConsumerConfig.MinFetchBytes,
-  consumerMaxWait: Int = ConsumerConfig.MaxFetchWaitMs,
-
-  /**
-   * Defines a low water mark for how many messages we buffer before we start
-   * executing fetch requests against brokers to get more messages. This value
-   * is divided equally among all registered SystemStreamPartitions. For
-   * example, if fetchThreshold is set to 50000, and there are 50
-   * SystemStreamPartitions registered, then the per-partition threshold is
-   * 1000. As soon as a SystemStreamPartition's buffered message count drops
-   * below 1000, a fetch request will be executed to get more data for it.
-   *
-   * Increasing this parameter will decrease the latency between when a queue
-   * is drained of messages and when new messages are enqueued, but also leads
-   * to an increase in memory usage since more messages will be held in memory.
-   */
-  fetchThreshold: Int = 50000,
-  /**
-   * Defines a low water mark for how many bytes we buffer before we start
-   * executing fetch requests against brokers to get more messages. This
-   * value is divided by 2 because the messages are buffered twice, once in
-   * KafkaConsumer and then in SystemConsumers. This value
-   * is divided equally among all registered SystemStreamPartitions.
-   * However this is a soft limit per partition, as the
-   * bytes are cached at the message boundaries, and the actual usage can be
-   * 1000 bytes + size of max message in the partition for a given stream.
-   * The bytes if the size of the bytebuffer in Message. Hence, the
-   * Object overhead is not taken into consideration. In this codebase
-   * it seems to be quite small. Hence, even for 500000 messages this is around 4MB x 2 = 8MB,
-   * which is not considerable.
-   *
-   * For example,
-   * if fetchThresholdBytes is set to 100000 bytes, and there are 50
-   * SystemStreamPartitions registered, then the per-partition threshold is
-   * (100000 / 2) / 50 = 1000 bytes.
-   * As this is a soft limit, the actual usage can be 1000 bytes + size of max message.
-   * As soon as a SystemStreamPartition's buffered messages bytes drops
-   * below 1000, a fetch request will be executed to get more data for it.
-   *
-   * Increasing this parameter will decrease the latency between when a queue
-   * is drained of messages and when new messages are enqueued, but also leads
-   * to an increase in memory usage since more messages will be held in memory.
-   *
-   * The default value is -1, which means this is not used. When the value
-   * is > 0, then the fetchThreshold which is count based is ignored.
-   */
-  fetchThresholdBytes: Long = -1,
-  /**
-   * if(fetchThresholdBytes > 0) true else false
-   */
-  fetchLimitByBytesEnabled: Boolean = false,
-  offsetGetter: GetOffset = new GetOffset("fail"),
-  deserializer: Decoder[Object] = new DefaultDecoder().asInstanceOf[Decoder[Object]],
-  keyDeserializer: Decoder[Object] = new DefaultDecoder().asInstanceOf[Decoder[Object]],
-  retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy,
-  clock: () => Long = { System.currentTimeMillis }) extends BlockingEnvelopeMap(
-    metrics.registry,
-    new Clock {
-      def currentTimeMillis = clock()
-    },
-    classOf[KafkaSystemConsumerMetrics].getName) with Toss with Logging {
-
-  type HostPort = (String, Int)
-  val brokerProxies = scala.collection.mutable.Map[HostPort, BrokerProxy]()
-  val topicPartitionsAndOffsets: scala.collection.concurrent.Map[TopicAndPartition, String] = new ConcurrentHashMap[TopicAndPartition, String]().asScala
-  var perPartitionFetchThreshold = fetchThreshold
-  var perPartitionFetchThresholdBytes = 0L
-
-  def start() {
-    if (topicPartitionsAndOffsets.size > 0) {
-      perPartitionFetchThreshold = fetchThreshold / topicPartitionsAndOffsets.size
-      // messages get double buffered, hence divide by 2
-      if(fetchLimitByBytesEnabled) {
-        perPartitionFetchThresholdBytes = (fetchThresholdBytes / 2) / topicPartitionsAndOffsets.size
-      }
-    }
-
-    systemAdmin.start()
-    refreshBrokers
-  }
-
-  override def register(systemStreamPartition: SystemStreamPartition, offset: String) {
-    super.register(systemStreamPartition, offset)
-
-    val topicAndPartition = KafkaSystemConsumer.toTopicAndPartition(systemStreamPartition)
-    val existingOffset = topicPartitionsAndOffsets.getOrElseUpdate(topicAndPartition, offset)
-    // register the older offset in the consumer
-    if (systemAdmin.offsetComparator(existingOffset, offset) >= 0) {
-      topicPartitionsAndOffsets.replace(topicAndPartition, offset)
-    }
-
-    metrics.registerTopicAndPartition(KafkaSystemConsumer.toTopicAndPartition(systemStreamPartition))
-  }
-
-  def stop() {
-    systemAdmin.stop()
-    brokerProxies.values.foreach(_.stop)
-  }
-
-  protected def createBrokerProxy(host: String, port: Int): BrokerProxy = {
-    info("Creating new broker proxy for host: %s and port: %s" format(host, port))
-    new BrokerProxy(host, port, systemName, clientId, metrics, sink, timeout, bufferSize, fetchSize, consumerMinSize, consumerMaxWait, offsetGetter)
-  }
-
-  protected def getPartitionMetadata(topicMetadata: TopicMetadata, partition: Int): Option[PartitionMetadata] = {
-    topicMetadata.partitionsMetadata.find(_.partitionId == partition)
-  }
-
-  protected def getLeaderHostPort(partitionMetadata: Option[PartitionMetadata]): Option[(String, Int)] = {
-    // Whatever we do, we can't say Broker, even though we're
-    // manipulating it here. Broker is a private type and Scala doesn't seem
-    // to care about that as long as you don't explicitly declare its type.
-    val brokerOption = partitionMetadata.flatMap(_.leader)
-
-    brokerOption match {
-      case Some(broker) => Some(broker.host, broker.port)
-      case _ => None
-    }
-  }
-
-  def refreshBrokers {
-    var tpToRefresh = topicPartitionsAndOffsets.keySet.toList
-    info("Refreshing brokers for: %s" format topicPartitionsAndOffsets)
-    retryBackoff.run(
-      loop => {
-        val topics = tpToRefresh.map(_.topic).toSet
-        val topicMetadata = TopicMetadataCache.getTopicMetadata(topics, systemName, (topics: Set[String]) => metadataStore.getTopicInfo(topics))
-
-        // addTopicPartition one at a time, leaving the to-be-done list intact in case of exceptions.
-        // This avoids trying to re-add the same topic partition repeatedly
-        def refresh() = {
-          val head = tpToRefresh.head
-          // refreshBrokers can be called from abdicate and refreshDropped,
-          // both of which are triggered from BrokerProxy threads. To prevent
-          // accidentally creating multiple objects for the same broker, or
-          // accidentally not updating the topicPartitionsAndOffsets variable,
-          // we need to lock.
-          this.synchronized {
-            // Check if we still need this TopicAndPartition inside the
-            // critical section. If we don't, then notAValidEvent it.
-            topicPartitionsAndOffsets.get(head) match {
-              case Some(nextOffset) =>
-                val partitionMetadata = getPartitionMetadata(topicMetadata(head.topic), head.partition)
-                getLeaderHostPort(partitionMetadata) match {
-                  case Some((host, port)) =>
-                    debug("Got partition metadata for %s: %s" format(head, partitionMetadata.get))
-                    val brokerProxy = brokerProxies.getOrElseUpdate((host, port), createBrokerProxy(host, port))
-                    brokerProxy.addTopicPartition(head, Option(nextOffset))
-                    brokerProxy.start
-                    debug("Claimed topic-partition (%s) for (%s)".format(head, brokerProxy))
-                    topicPartitionsAndOffsets -= head
-                  case None => info("No metadata available for: %s. Will try to refresh and add to a consumer thread later." format head)
-                }
-              case _ => debug("Ignoring refresh for %s because we already added it from another thread." format head)
-            }
-          }
-          tpToRefresh.tail
-        }
-
-        while (!tpToRefresh.isEmpty) {
-          tpToRefresh = refresh()
-        }
-
-        loop.done
-      },
-
-      (exception, loop) => {
-        warn("While refreshing brokers for %s: %s. Retrying." format (tpToRefresh.head, exception))
-        debug("Exception detail:", exception)
-      })
-  }
-
-  val sink = new MessageSink {
-    var lastDroppedRefresh = clock()
-
-    def refreshDropped() {
-      if (topicPartitionsAndOffsets.size > 0 && clock() - lastDroppedRefresh > 10000) {
-        refreshBrokers
-        lastDroppedRefresh = clock()
-      }
-    }
-
-    def setIsAtHighWatermark(tp: TopicAndPartition, isAtHighWatermark: Boolean) {
-      setIsAtHead(toSystemStreamPartition(tp), isAtHighWatermark)
-    }
-
-    def needsMoreMessages(tp: TopicAndPartition) = {
-      if(fetchLimitByBytesEnabled) {
-        getMessagesSizeInQueue(toSystemStreamPartition(tp)) < perPartitionFetchThresholdBytes
-      } else {
-        getNumMessagesInQueue(toSystemStreamPartition(tp)) < perPartitionFetchThreshold
-      }
-    }
-
-    def getMessageSize(message: Message): Integer = {
-      message.size + KafkaSystemConsumer.MESSAGE_SIZE_OVERHEAD
-    }
-
-    def addMessage(tp: TopicAndPartition, msg: MessageAndOffset, highWatermark: Long) = {
-      trace("Incoming message %s: %s." format (tp, msg))
-
-      val systemStreamPartition = toSystemStreamPartition(tp)
-      val isAtHead = highWatermark == msg.offset
-      val offset = msg.offset.toString
-      val key = if (msg.message.key != null) {
-        keyDeserializer.fromBytes(Utils.readBytes(msg.message.key))
-      } else {
-        null
-      }
-      val message = if (!msg.message.isNull) {
-        deserializer.fromBytes(Utils.readBytes(msg.message.payload))
-      } else {
-        null
-      }
-
-      if(fetchLimitByBytesEnabled ) {
-        val ime = new IncomingMessageEnvelope(systemStreamPartition, offset, key, message, getMessageSize(msg.message))
-        ime.setTimestamp(if (!msg.message.isNull) msg.message.timestamp else 0L)
-        put(systemStreamPartition, ime)
-      } else {
-        val ime = new IncomingMessageEnvelope(systemStreamPartition, offset, key, message)
-        ime.setTimestamp(if (!msg.message.isNull) msg.message.timestamp else 0L)
-        put(systemStreamPartition, ime)
-      }
-
-      setIsAtHead(systemStreamPartition, isAtHead)
-    }
-
-    def abdicate(tp: TopicAndPartition, nextOffset: Long) {
-      info("Abdicating for %s" format (tp))
-      topicPartitionsAndOffsets += tp -> nextOffset.toString
-      refreshBrokers
-    }
-
-    private def toSystemStreamPartition(tp: TopicAndPartition) = {
-      new SystemStreamPartition(systemName, tp.topic, new Partition(tp.partition))
-    }
-  }
-}