You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by bo...@apache.org on 2018/09/26 00:23:22 UTC

[23/29] samza git commit: review comments

review comments


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/5397a34e
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/5397a34e
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/5397a34e

Branch: refs/heads/NewKafkaSystemConsumer
Commit: 5397a34e2a6a5df0d7ae088ec2b309e65b53b4e7
Parents: 1d1fb89
Author: Boris S <bo...@apache.org>
Authored: Mon Sep 24 10:54:27 2018 -0700
Committer: Boris S <bo...@apache.org>
Committed: Mon Sep 24 10:54:27 2018 -0700

----------------------------------------------------------------------
 .../apache/samza/container/SamzaContainer.scala |   2 +-
 .../clients/consumer/KafkaConsumerConfig.java   | 194 ----------------
 .../samza/config/KafkaConsumerConfig.java       | 198 +++++++++++++++++
 .../samza/system/kafka/KafkaConsumerProxy.java  | 220 +++++++++----------
 .../samza/system/kafka/KafkaSystemConsumer.java | 187 ++++++++--------
 .../kafka/KafkaSystemConsumerMetrics.scala      |   2 +-
 .../samza/system/kafka/KafkaSystemFactory.scala |   4 +-
 .../consumer/TestKafkaConsumerConfig.java       | 137 ------------
 .../samza/config/TestKafkaConsumerConfig.java   | 152 +++++++++++++
 .../system/kafka/TestKafkaSystemConsumer.java   |  12 +-
 10 files changed, 552 insertions(+), 556 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/5397a34e/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index e71fcb3..fba7329 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -822,7 +822,7 @@ class SamzaContainer(
     }
 
     try {
-      info("Shutting down SamzaContaier.")
+      info("Shutting down SamzaContainer.")
       removeShutdownHook
 
       jmxServer.stop

http://git-wip-us.apache.org/repos/asf/samza/blob/5397a34e/samza-kafka/src/main/scala/org/apache/kafka/clients/consumer/KafkaConsumerConfig.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/kafka/clients/consumer/KafkaConsumerConfig.java b/samza-kafka/src/main/scala/org/apache/kafka/clients/consumer/KafkaConsumerConfig.java
deleted file mode 100644
index 8ada1b4..0000000
--- a/samza-kafka/src/main/scala/org/apache/kafka/clients/consumer/KafkaConsumerConfig.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.kafka.clients.consumer;
-
-import java.util.Map;
-import java.util.Properties;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.kafka.common.serialization.ByteArrayDeserializer;
-import org.apache.samza.SamzaException;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.ConfigException;
-import org.apache.samza.config.JobConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.Option;
-
-
-/**
- * The configuration class for KafkaConsumer
- */
-public class KafkaConsumerConfig extends ConsumerConfig {
-
-  public static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerConfig.class);
-
-  private static final String PRODUCER_CLIENT_ID_PREFIX = "kafka-producer";
-  private static final String CONSUMER_CLIENT_ID_PREFIX = "kafka-consumer";
-  private static final String ADMIN_CLIENT_ID_PREFIX = "samza-admin";
-  private static final String SAMZA_OFFSET_LARGEST = "largest";
-  private static final String SAMZA_OFFSET_SMALLEST = "smallest";
-  private static final String KAFKA_OFFSET_LATEST = "latest";
-  private static final String KAFKA_OFFSET_EARLIEST = "earliest";
-  private static final String KAFKA_OFFSET_NONE = "none";
-
-  /*
-   * By default, KafkaConsumer will fetch ALL available messages for all the partitions.
-   * This may cause memory issues. That's why we will limit the number of messages per partition we get on EACH poll().
-   */
-  static final String DEFAULT_KAFKA_CONSUMER_MAX_POLL_RECORDS = "100";
-
-  private KafkaConsumerConfig(Properties props) {
-    super(props);
-  }
-
-  /**
-   * Create kafka consumer configs, based on the subset of global configs.
-   * @param config
-   * @param systemName
-   * @param clientId
-   * @param injectProps
-   * @return KafkaConsumerConfig
-   */
-  public static KafkaConsumerConfig getKafkaSystemConsumerConfig(Config config, String systemName, String clientId,
-      Map<String, String> injectProps) {
-
-    final Config subConf = config.subset(String.format("systems.%s.consumer.", systemName), true);
-
-    final String groupId = getConsumerGroupId(config);
-
-    final Properties consumerProps = new Properties();
-    consumerProps.putAll(subConf);
-
-    consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
-    consumerProps.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
-
-    //Kafka client configuration
-
-    // put overrides
-    consumerProps.putAll(injectProps);
-
-    // These are values we enforce in sazma, and they cannot be overwritten.
-
-    // Disable consumer auto-commit because Samza controls commits
-    consumerProps.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
-
-    // Translate samza config value to kafka config value
-    consumerProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
-        getAutoOffsetResetValue(consumerProps));
-
-    // make sure bootstrap configs are in ?? SHOULD WE FAIL IF THEY ARE NOT?
-    if (!subConf.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)) {
-      // get it from the producer config
-      String bootstrapServers =
-          config.get(String.format("systems.%s.producer.%s", systemName, ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
-      if (StringUtils.isEmpty(bootstrapServers)) {
-        throw new SamzaException("Missing " + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG + " config  for " + systemName);
-      }
-      consumerProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
-    }
-
-    // Always use default partition assignment strategy. Do not allow override.
-    consumerProps.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RangeAssignor.class.getName());
-
-    // the consumer is fully typed, and deserialization can be too. But in case it is not provided we should
-    // default to byte[]
-    if (!consumerProps.containsKey(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)) {
-      LOG.info("setting default key serialization for the consumer(for {}) to ByteArrayDeserializer", systemName);
-      consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
-    }
-    if (!consumerProps.containsKey(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)) {
-      LOG.info("setting default value serialization for the consumer(for {}) to ByteArrayDeserializer", systemName);
-      consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
-    }
-
-    // Override default max poll config if there is no value
-    consumerProps.computeIfAbsent(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,
-        (k) -> DEFAULT_KAFKA_CONSUMER_MAX_POLL_RECORDS);
-
-    return new KafkaConsumerConfig(consumerProps);
-  }
-
-  // group id should be unique per job
-  static String getConsumerGroupId(Config config) {
-    JobConfig jobConfig = new JobConfig(config);
-    Option<String> jobIdOption = jobConfig.getJobId();
-    Option<String> jobNameOption = jobConfig.getName();
-    return (jobNameOption.isDefined() ? jobNameOption.get() : "undefined_job_name") + "-" + (jobIdOption.isDefined()
-        ? jobIdOption.get() : "undefined_job_id");
-  }
-
-  // client id should be unique per job
-  public static String getConsumerClientId(Config config) {
-    return getConsumerClientId(CONSUMER_CLIENT_ID_PREFIX, config);
-  }
-  public static String getProducerClientId(Config config) {
-    return getConsumerClientId(PRODUCER_CLIENT_ID_PREFIX, config);
-  }
-  public static String getAdminClientId(Config config) {
-    return getConsumerClientId(ADMIN_CLIENT_ID_PREFIX, config);
-  }
-
-  private static String getConsumerClientId(String id, Config config) {
-    if (config.get(JobConfig.JOB_NAME()) == null) {
-      throw new ConfigException("Missing job name");
-    }
-    String jobName = config.get(JobConfig.JOB_NAME());
-    String jobId = (config.get(JobConfig.JOB_ID()) != null) ? config.get(JobConfig.JOB_ID()) : "1";
-
-    return String.format("%s-%s-%s", id.replaceAll("[^A-Za-z0-9]", "_"), jobName.replaceAll("[^A-Za-z0-9]", "_"),
-        jobId.replaceAll("[^A-Za-z0-9]", "_"));
-  }
-
-  /**
-   * If settings for auto.reset in samza are different from settings in Kafka (auto.offset.reset),
-   * then need to convert them (see kafka.apache.org/documentation):
-   * "largest" -> "latest"
-   * "smallest" -> "earliest"
-   *
-   * If no setting specified we return "latest" (same as Kafka).
-   * @param properties All consumer related {@link Properties} parsed from samza config
-   * @return String representing the config value for "auto.offset.reset" property
-   */
-  static String getAutoOffsetResetValue(Properties properties) {
-    String autoOffsetReset = properties.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, KAFKA_OFFSET_LATEST);
-
-    // accept kafka values directly
-    if (autoOffsetReset.equals(KAFKA_OFFSET_EARLIEST) || autoOffsetReset.equals(KAFKA_OFFSET_LATEST)
-        || autoOffsetReset.equals(KAFKA_OFFSET_NONE)) {
-      return autoOffsetReset;
-    }
-
-    String newAutoOffsetReset;
-    switch (autoOffsetReset) {
-      case SAMZA_OFFSET_LARGEST:
-        newAutoOffsetReset =  KAFKA_OFFSET_LATEST;
-        break;
-      case SAMZA_OFFSET_SMALLEST:
-        newAutoOffsetReset =  KAFKA_OFFSET_EARLIEST;
-        break;
-      default:
-        newAutoOffsetReset =  KAFKA_OFFSET_LATEST;
-    }
-    LOG.info("AutoOffsetReset value converted from {} to {}", autoOffsetReset,  newAutoOffsetReset);
-    return newAutoOffsetReset;
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/5397a34e/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConsumerConfig.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConsumerConfig.java b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConsumerConfig.java
new file mode 100644
index 0000000..4bbe00f
--- /dev/null
+++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConsumerConfig.java
@@ -0,0 +1,198 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.samza.config;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.RangeAssignor;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.JobConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+
+
+/**
+ * The configuration class for KafkaConsumer
+ */
+public class KafkaConsumerConfig extends HashMap<String, Object> {
+
+  public static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerConfig.class);
+
+  private static final String PRODUCER_CLIENT_ID_PREFIX = "kafka-producer";
+  private static final String CONSUMER_CLIENT_ID_PREFIX = "kafka-consumer";
+  private static final String ADMIN_CLIENT_ID_PREFIX = "samza-admin";
+
+  /*
+   * By default, KafkaConsumer will fetch some big number of available messages for all the partitions.
+   * This may cause memory issues. That's why we will limit the number of messages per partition we get on EACH poll().
+   */
+  static final String DEFAULT_KAFKA_CONSUMER_MAX_POLL_RECORDS = "100";
+
+  private KafkaConsumerConfig(Map<String, Object> map) {
+    super(map);
+  }
+
+  /**
+   * This is a help method to create the configs for use in Kafka consumer.
+   * The values are based on the "consumer" subset of the configs provided by the app and Samza overrides.
+   *
+   * @param config - config provided by the app.
+   * @param systemName - system name for which the consumer is configured.
+   * @param clientId - client id to be used in the Kafka consumer.
+   * @return KafkaConsumerConfig
+   */
+  public static KafkaConsumerConfig getKafkaSystemConsumerConfig(Config config, String systemName, String clientId) {
+
+    Config subConf = config.subset(String.format("systems.%s.consumer.", systemName), true);
+
+    //Kafka client configuration
+    String groupId = getConsumerGroupId(config);
+
+    Map<String, Object> consumerProps = new HashMap<>();
+    consumerProps.putAll(subConf);
+
+    consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+    consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
+
+
+    // These are values we enforce in sazma, and they cannot be overwritten.
+
+    // Disable consumer auto-commit because Samza controls commits
+    consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+
+    // Translate samza config value to kafka config value
+    consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
+        getAutoOffsetResetValue((String)consumerProps.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)));
+
+    // make sure bootstrap configs are in, if not - get them from the producer
+    if (!subConf.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)) {
+      String bootstrapServers =
+          config.get(String.format("systems.%s.producer.%s", systemName, ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
+      if (StringUtils.isEmpty(bootstrapServers)) {
+        throw new SamzaException("Missing " + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG + " config  for " + systemName);
+      }
+      consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+    }
+
+    // Always use default partition assignment strategy. Do not allow override.
+    consumerProps.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RangeAssignor.class.getName());
+
+    // the consumer is fully typed, and deserialization can be too. But in case it is not provided we should
+    // default to byte[]
+    if (!consumerProps.containsKey(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)) {
+      LOG.info("setting key serialization for the consumer(for system {}) to ByteArrayDeserializer", systemName);
+      consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+    }
+    if (!consumerProps.containsKey(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)) {
+      LOG.info("setting value serialization for the consumer(for system {}) to ByteArrayDeserializer", systemName);
+      consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+    }
+
+    // Override default max poll config if there is no value
+    consumerProps.computeIfAbsent(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,
+        (k) -> DEFAULT_KAFKA_CONSUMER_MAX_POLL_RECORDS);
+
+    return new KafkaConsumerConfig(consumerProps);
+  }
+
+  // group id should be unique per job
+  static String getConsumerGroupId(Config config) {
+    JobConfig jobConfig = new JobConfig(config);
+    Option<String> jobIdOption = jobConfig.getJobId();
+    Option<String> jobNameOption = jobConfig.getName();
+    return (jobNameOption.isDefined() ? jobNameOption.get() : "undefined_job_name") + "-" + (jobIdOption.isDefined()
+        ? jobIdOption.get() : "undefined_job_id");
+  }
+
+  // client id should be unique per job
+  public static String getConsumerClientId(Config config) {
+    return getConsumerClientId(CONSUMER_CLIENT_ID_PREFIX, config);
+  }
+
+  public static String getProducerClientId(Config config) {
+    return getConsumerClientId(PRODUCER_CLIENT_ID_PREFIX, config);
+  }
+
+  public static String getAdminClientId(Config config) {
+    return getConsumerClientId(ADMIN_CLIENT_ID_PREFIX, config);
+  }
+
+  static String getConsumerClientId(String id, Config config) {
+    if (config.get(JobConfig.JOB_NAME()) == null) {
+      throw new ConfigException("Missing job name");
+    }
+    String jobName = config.get(JobConfig.JOB_NAME());
+    String jobId = (config.get(JobConfig.JOB_ID()) != null) ? config.get(JobConfig.JOB_ID()) : "1";
+
+    return String.format("%s-%s-%s", id.replaceAll(
+        "\\W", "_"),
+        jobName.replaceAll("\\W", "_"),
+        jobId.replaceAll("\\W", "_"));
+  }
+
+  /**
+   * If settings for auto.reset in samza are different from settings in Kafka (auto.offset.reset),
+   * then need to convert them (see kafka.apache.org/documentation):
+   * "largest" -> "latest"
+   * "smallest" -> "earliest"
+   *
+   * If no setting specified we return "latest" (same as Kafka).
+   * @param autoOffsetReset value from the app provided config
+   * @return String representing the config value for "auto.offset.reset" property
+   */
+  static String getAutoOffsetResetValue(final String autoOffsetReset) {
+    final String SAMZA_OFFSET_LARGEST = "largest";
+    final String SAMZA_OFFSET_SMALLEST = "smallest";
+    final String KAFKA_OFFSET_LATEST = "latest";
+    final String KAFKA_OFFSET_EARLIEST = "earliest";
+    final String KAFKA_OFFSET_NONE = "none";
+
+    if (autoOffsetReset == null) {
+     return KAFKA_OFFSET_LATEST; // return default
+    }
+
+    // accept kafka values directly
+    if (autoOffsetReset.equals(KAFKA_OFFSET_EARLIEST) || autoOffsetReset.equals(KAFKA_OFFSET_LATEST)
+        || autoOffsetReset.equals(KAFKA_OFFSET_NONE)) {
+      return autoOffsetReset;
+    }
+
+    String newAutoOffsetReset;
+    switch (autoOffsetReset) {
+      case SAMZA_OFFSET_LARGEST:
+        newAutoOffsetReset =  KAFKA_OFFSET_LATEST;
+        break;
+      case SAMZA_OFFSET_SMALLEST:
+        newAutoOffsetReset =  KAFKA_OFFSET_EARLIEST;
+        break;
+      default:
+        newAutoOffsetReset =  KAFKA_OFFSET_LATEST;
+    }
+    LOG.info("AutoOffsetReset value converted from {} to {}", autoOffsetReset,  newAutoOffsetReset);
+    return newAutoOffsetReset;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/5397a34e/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
index b67df0a..d2f7096 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
@@ -30,6 +30,7 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import javax.print.DocFlavor;
 import kafka.common.KafkaException;
 import kafka.common.TopicAndPartition;
 import org.apache.kafka.clients.consumer.Consumer;
@@ -47,7 +48,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Separate thread that reads messages from kafka and puts them into the BlockingEnvelopeMap.
+ * This class contains a separate thread that reads messages from kafka and puts them  into the BlockingEnvelopeMap
+ * through KafkaSystemConsumer.KafkaConsumerMessageSink object.
  * This class is not thread safe. There will be only one instance of this class per KafkaSystemConsumer object.
  * We still need some synchronization around kafkaConsumer. See pollConsumer() method for details.
  */
@@ -74,7 +76,8 @@ import org.slf4j.LoggerFactory;
   private volatile Throwable failureCause = null;
   private final CountDownLatch consumerPollThreadStartLatch = new CountDownLatch(1);
 
-  /* package private */KafkaConsumerProxy(Consumer<K, V> kafkaConsumer, String systemName, String clientId,
+  // package private constructor
+  KafkaConsumerProxy(Consumer<K, V> kafkaConsumer, String systemName, String clientId,
       KafkaSystemConsumer.KafkaConsumerMessageSink messageSink, KafkaSystemConsumerMetrics samzaConsumerMetrics,
       String metricName) {
 
@@ -93,6 +96,11 @@ import org.slf4j.LoggerFactory;
         "Samza KafkaConsumerProxy Poll " + consumerPollThread.getName() + " - " + systemName);
   }
 
+  @Override
+  public String toString() {
+    return String.format("consumerProxy-%s-%s", systemName, clientId);
+  }
+
   public void start() {
     if (!consumerPollThread.isAlive()) {
       LOG.info("Starting KafkaConsumerProxy polling thread for system " + systemName + " " + this.toString());
@@ -108,12 +116,43 @@ import org.slf4j.LoggerFactory;
         }
       }
     } else {
-      LOG.debug("Tried to start an already started KafkaConsumerProxy (%s). Ignoring.", this.toString());
+      LOG.warn("Tried to start an already started KafkaConsumerProxy (%s). Ignoring.", this.toString());
+    }
+
+    if (topicPartitions2SSP.size() == 0) {
+      String msg = String.format("Cannot start empty set of TopicPartitions for system %s, clientid %s",
+          systemName, clientId);
+      LOG.error(msg);
+      throw new SamzaException(msg);
     }
   }
 
-  // add new partition to the list of polled partitions
-  // this method is called only at the beginning, before the thread is started
+  /**
+   * Stop the thread and wait for it to stop.
+   * @param timeoutMs how long to wait in join
+   */
+  public void stop(long timeoutMs) {
+    LOG.info("Shutting down KafkaConsumerProxy poll thread:" + consumerPollThread.getName());
+
+    isRunning = false;
+    try {
+      consumerPollThread.join(timeoutMs);
+      // join returns event if the thread didn't finish
+      // in this case we should interrupt it and wait again
+      if (consumerPollThread.isAlive()) {
+        consumerPollThread.interrupt();
+        consumerPollThread.join(timeoutMs);
+      }
+    } catch (InterruptedException e) {
+      LOG.warn("Join in KafkaConsumerProxy has failed", e);
+      consumerPollThread.interrupt();
+    }
+  }
+
+  /**
+   * Add new partition to the list of polled partitions.
+   * This method should be called only at the beginning, before the thread is started.
+   */
   public void addTopicPartition(SystemStreamPartition ssp, long nextOffset) {
     LOG.info(String.format("Adding new topic and partition %s, offset = %s to queue for consumer %s", ssp, nextOffset,
         this));
@@ -124,67 +163,13 @@ import org.slf4j.LoggerFactory;
 
     nextOffsets.put(ssp, nextOffset);
 
-    // we reuse existing metrics. They assume host and port for the broker
-    // for now fake the port with the consumer name
     kafkaConsumerMetrics.setTopicPartitionValue(metricName, nextOffsets.size());
   }
 
-  /**
-   * creates a separate thread for pulling messages
-   */
-  private Runnable createProxyThreadRunnable() {
-    Runnable runnable=  () -> {
-      isRunning = true;
-
-      try {
-        consumerPollThreadStartLatch.countDown();
-        LOG.info("Starting runnable " + consumerPollThread.getName());
-        initializeLags();
-        while (isRunning) {
-          fetchMessages();
-        }
-      } catch (Throwable throwable) {
-        LOG.error(String.format("Error in KafkaConsumerProxy poll thread for system: %s.", systemName), throwable);
-        // SamzaKafkaSystemConsumer uses the failureCause to propagate the throwable to the container
-        failureCause = throwable;
-        isRunning = false;
-      }
-
-      if (!isRunning) {
-        LOG.info("Stopping the KafkaConsumerProxy poll thread for system: {}.", systemName);
-      }
-    };
-
-    return runnable;
-  }
-
-  private void initializeLags() {
-    // This is expensive, so only do it once at the beginning. After the first poll, we can rely on metrics for lag.
-    Map<TopicPartition, Long> endOffsets = kafkaConsumer.endOffsets(topicPartitions2SSP.keySet());
-    endOffsets.forEach((tp, offset) -> {
-      SystemStreamPartition ssp = topicPartitions2SSP.get(tp);
-      long startingOffset = nextOffsets.get(ssp);
-      // End offsets are the offset of the newest message + 1
-      // If the message we are about to consume is < end offset, we are starting with a lag.
-      long initialLag = endOffsets.get(tp) - startingOffset;
-
-      LOG.info("Initial lag for SSP {} is {} (end={}, startOffset={})", ssp, initialLag, endOffsets.get(tp), startingOffset);
-      latestLags.put(ssp, initialLag);
-      sink.setIsAtHighWatermark(ssp, initialLag == 0);
-    });
-
-    // initialize lag metrics
-    refreshLatencyMetrics();
-  }
-
   // the actual polling of the messages from kafka
-  public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> pollConsumer(
+  private Map<SystemStreamPartition, List<IncomingMessageEnvelope>> pollConsumer(
       Set<SystemStreamPartition> systemStreamPartitions, long timeout) {
 
-    if (topicPartitions2SSP.size() == 0) {
-      throw new SamzaException("cannot poll empty set of TopicPartitions");
-    }
-
     // Since we need to poll only from some subset of TopicPartitions (passed as the argument),
     // we need to pause the rest.
     List<TopicPartition> topicPartitionsToPause = new ArrayList<>();
@@ -201,10 +186,9 @@ import org.slf4j.LoggerFactory;
     }
 
     ConsumerRecords<K, V> records;
-    // make a call on the client
+
     try {
-      // Currently, when doing checkpoint we are making a safeOffset request through this client, thus we need to synchronize
-      // them. In the future we may use this client for the actually checkpointing.
+      // Synchronize, in case the consumer is used in some other thread (metadata or something else)
       synchronized (kafkaConsumer) {
         // Since we are not polling from ALL the subscribed topics, so we need to "change" the subscription temporarily
         kafkaConsumer.pause(topicPartitionsToPause);
@@ -213,12 +197,7 @@ import org.slf4j.LoggerFactory;
         // resume original set of subscription - may be required for checkpointing
         kafkaConsumer.resume(topicPartitionsToPause);
       }
-    } catch (InvalidOffsetException e) {
-      // If the consumer has thrown this exception it means that auto reset is not set for this consumer.
-      // So we just rethrow.
-      LOG.error("Caught InvalidOffsetException in pollConsumer", e);
-      throw e;
-    } catch (KafkaException e) {
+    } catch (Exception e) {
       // we may get InvalidOffsetException | AuthorizationException | KafkaException exceptions,
       // but we still just rethrow, and log it up the stack.
       LOG.error("Caught a Kafka exception in pollConsumer", e);
@@ -230,11 +209,10 @@ import org.slf4j.LoggerFactory;
 
   private Map<SystemStreamPartition, List<IncomingMessageEnvelope>> processResults(ConsumerRecords<K, V> records) {
     if (records == null) {
-      throw new SamzaException("processResults is called with null object for records");
+      throw new SamzaException("ERROR:records is null, after pollConsumer call (in processResults)");
     }
 
-    int capacity = (int) (records.count() / 0.75 + 1); // to avoid rehash, allocate more then 75% of expected capacity.
-    Map<SystemStreamPartition, List<IncomingMessageEnvelope>> results = new HashMap<>(capacity);
+    Map<SystemStreamPartition, List<IncomingMessageEnvelope>> results = new HashMap<>(records.count());
     // Parse the returned records and convert them into the IncomingMessageEnvelope.
     // Note. They have been already de-serialized by the consumer.
     for (ConsumerRecord<K, V> record : records) {
@@ -268,6 +246,52 @@ import org.slf4j.LoggerFactory;
     return results;
   }
 
+   // creates a separate thread for getting the messages.
+  private Runnable createProxyThreadRunnable() {
+    Runnable runnable=  () -> {
+      isRunning = true;
+
+      try {
+        consumerPollThreadStartLatch.countDown();
+        LOG.info("Starting runnable " + consumerPollThread.getName());
+        initializeLags();
+        while (isRunning) {
+          fetchMessages();
+        }
+      } catch (Throwable throwable) {
+        LOG.error(String.format("Error in KafkaConsumerProxy poll thread for system: %s.", systemName), throwable);
+        // KafkaSystemConsumer uses the failureCause to propagate the throwable to the container
+        failureCause = throwable;
+        isRunning = false;
+      }
+
+      if (!isRunning) {
+        LOG.info("KafkaConsumerProxy for system {} has stopped.", systemName);
+      }
+    };
+
+    return runnable;
+  }
+
+  private void initializeLags() {
+    // This is expensive, so only do it once at the beginning. After the first poll, we can rely on metrics for lag.
+    Map<TopicPartition, Long> endOffsets = kafkaConsumer.endOffsets(topicPartitions2SSP.keySet());
+    endOffsets.forEach((tp, offset) -> {
+      SystemStreamPartition ssp = topicPartitions2SSP.get(tp);
+      long startingOffset = nextOffsets.get(ssp);
+      // End offsets are the offset of the newest message + 1
+      // If the message we are about to consume is < end offset, we are starting with a lag.
+      long initialLag = endOffsets.get(tp) - startingOffset;
+
+      LOG.info("Initial lag for SSP {} is {} (end={}, startOffset={})", ssp, initialLag, endOffsets.get(tp), startingOffset);
+      latestLags.put(ssp, initialLag);
+      sink.setIsAtHighWatermark(ssp, initialLag == 0);
+    });
+
+    // initialize lag metrics
+    refreshLatencyMetrics();
+  }
+
   private int getRecordSize(ConsumerRecord<K, V> r) {
     int keySize = (r.key() == null) ? 0 : r.serializedKeySize();
     return keySize + r.serializedValueSize();
@@ -291,9 +315,7 @@ import org.slf4j.LoggerFactory;
     kafkaConsumerMetrics.setHighWatermarkValue(tap, highWatermark);
   }
 
-  /*
-   This method put messages into blockingEnvelopeMap.
-   */
+
   private void moveMessagesToTheirQueue(SystemStreamPartition ssp, List<IncomingMessageEnvelope> envelopes) {
     long nextOffset = nextOffsets.get(ssp);
 
@@ -317,11 +339,9 @@ import org.slf4j.LoggerFactory;
     }
   }
 
-  /*
-    The only way to figure out lag for the KafkaConsumer is to look at the metrics after each poll() call.
-    One of the metrics (records-lag) shows how far behind the HighWatermark the consumer is.
-    This method populates the lag information for each SSP into latestLags member variable.
-   */
+  // The only way to figure out lag for the KafkaConsumer is to look at the metrics after each poll() call.
+  // One of the metrics (records-lag) shows how far behind the HighWatermark the consumer is.
+  // This method populates the lag information for each SSP into latestLags member variable.
   private void populateCurrentLags(Set<SystemStreamPartition> ssps) {
 
     Map<MetricName, ? extends Metric> consumerMetrics = kafkaConsumer.metrics();
@@ -339,12 +359,6 @@ import org.slf4j.LoggerFactory;
       // so the lag is now at least 0, which is the same as Samza's definition.
       // If the lag is not 0, then isAtHead is not true, and kafkaClient keeps polling.
       long currentLag = (currentLagM != null) ? (long) currentLagM.value() : -1L;
-      /*
-      Metric averageLagM = consumerMetrics.get(new MetricName(tp + ".records-lag-avg", "consumer-fetch-manager-metrics", "", tags));
-      double averageLag = (averageLagM != null) ? averageLagM.value() : -1.0;
-      Metric maxLagM = consumerMetrics.get(new MetricName(tp + ".records-lag-max", "consumer-fetch-manager-metrics", "", tags));
-      double maxLag = (maxLagM != null) ? maxLagM.value() : -1.0;
-      */
       latestLags.put(ssp, currentLag);
 
       // calls the setIsAtHead for the BlockingEnvelopeMap
@@ -352,10 +366,8 @@ import org.slf4j.LoggerFactory;
     }
   }
 
-  /*
-    Get the latest lag for a specific SSP.
-   */
-  public long getLatestLag(SystemStreamPartition ssp) {
+  // Get the latest lag for a specific SSP.
+  private long getLatestLag(SystemStreamPartition ssp) {
     Long lag = latestLags.get(ssp);
     if (lag == null) {
       throw new SamzaException("Unknown/unregistered ssp in latestLags request: " + ssp);
@@ -363,9 +375,7 @@ import org.slf4j.LoggerFactory;
     return lag;
   }
 
-  /*
-    Using the consumer to poll the messages from the stream.
-   */
+  // Using the consumer to poll the messages from the stream.
   private void fetchMessages() {
     Set<SystemStreamPartition> sspsToFetch = new HashSet<>();
     for (SystemStreamPartition ssp : nextOffsets.keySet()) {
@@ -380,7 +390,7 @@ import org.slf4j.LoggerFactory;
       Map<SystemStreamPartition, List<IncomingMessageEnvelope>> response;
       LOG.debug("pollConsumer from following SSPs: {}; total#={}", sspsToFetch, sspsToFetch.size());
 
-      response = pollConsumer(sspsToFetch, 500); // TODO should be default value from ConsumerConfig
+      response = pollConsumer(sspsToFetch, 500L);
 
       // move the responses into the queue
       for (Map.Entry<SystemStreamPartition, List<IncomingMessageEnvelope>> e : response.entrySet()) {
@@ -430,27 +440,5 @@ import org.slf4j.LoggerFactory;
   Throwable getFailureCause() {
     return failureCause;
   }
-
-  /**
-   * stop the thread and wait for it to stop
-   * @param timeoutMs how long to wait in join
-   */
-  public void stop(long timeoutMs) {
-    LOG.info("Shutting down KafkaConsumerProxy poll thread:" + consumerPollThread.getName());
-
-    isRunning = false;
-    try {
-      consumerPollThread.join(timeoutMs);
-      // join returns event if the thread didn't finish
-      // in this case we should interrupt it and wait again
-      if (consumerPollThread.isAlive()) {
-        consumerPollThread.interrupt();
-        consumerPollThread.join(timeoutMs);
-      }
-    } catch (InterruptedException e) {
-      LOG.warn("Join in KafkaConsumerProxy has failed", e);
-      consumerPollThread.interrupt();
-    }
-  }
 }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/5397a34e/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.java b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.java
index 9101a89..e5ded8d 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.java
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.java
@@ -31,9 +31,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import kafka.common.TopicAndPartition;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.consumer.KafkaConsumerConfig;
+import org.apache.samza.config.KafkaConsumerConfig;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.samza.Partition;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.KafkaConfig;
@@ -56,32 +55,33 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
 
   private final Consumer<K, V> kafkaConsumer;
   private final String systemName;
-  private final KafkaSystemConsumerMetrics samzaConsumerMetrics;
   private final String clientId;
-  private final String metricName;
   private final AtomicBoolean stopped = new AtomicBoolean(false);
   private final AtomicBoolean started = new AtomicBoolean(false);
   private final Config config;
   private final boolean fetchThresholdBytesEnabled;
+  private final KafkaSystemConsumerMetrics metrics;
 
   // This sink is used to transfer the messages from the proxy/consumer to the BlockingEnvelopeMap.
-  /* package private */final KafkaConsumerMessageSink messageSink;
+  final KafkaConsumerMessageSink messageSink;
 
-  // proxy is doing the actual reading
+  // This proxy contains a separate thread, which reads kafka messages (with consumer.poll()) and populates
+  // BlockeingEnvelopMap's buffers.
   final private KafkaConsumerProxy proxy;
 
-  /* package private */final Map<TopicPartition, String> topicPartitions2Offset = new HashMap<>();
-  /* package private */final Map<TopicPartition, SystemStreamPartition> topicPartitions2SSP = new HashMap<>();
 
-  /* package private */ long perPartitionFetchThreshold;
-  /* package private */ long perPartitionFetchThresholdBytes;
+  // keep registration data until the start - mapping between registered SSPs and topicPartitions, and the offsets
+  final Map<TopicPartition, String> topicPartitionsToOffset = new HashMap<>();
+  final Map<TopicPartition, SystemStreamPartition> topicPartitionsToSSP = new HashMap<>();
+
+  long perPartitionFetchThreshold;
+  long perPartitionFetchThresholdBytes;
 
   /**
-   * Constructor
    * @param systemName system name for which we create the consumer
-   * @param config config
-   * @param metrics metrics
-   * @param clock - system clock
+   * @param config config passed into the the app
+   * @param metrics metrics collecting object
+   * @param clock - system clock, allows to override internal clock (System.currentTimeMillis())
    */
   public KafkaSystemConsumer(Consumer<K, V> kafkaConsumer, String systemName, Config config, String clientId,
       KafkaSystemConsumerMetrics metrics, Clock clock) {
@@ -89,54 +89,50 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
     super(metrics.registry(), clock, metrics.getClass().getName());
 
     this.kafkaConsumer = kafkaConsumer;
-    this.samzaConsumerMetrics = metrics;
     this.clientId = clientId;
     this.systemName = systemName;
     this.config = config;
-    this.metricName = String.format("%s %s", systemName, clientId);
+    this.metrics = metrics;
 
-    this.fetchThresholdBytesEnabled = new KafkaConfig(config).isConsumerFetchThresholdBytesEnabled(systemName);
+    fetchThresholdBytesEnabled = new KafkaConfig(config).isConsumerFetchThresholdBytesEnabled(systemName);
 
     // create a sink for passing the messages between the proxy and the consumer
     messageSink = new KafkaConsumerMessageSink();
 
-    // Create the proxy to do the actual message reading. It is a separate thread that reads the messages from the stream
-    // and puts them into the sink.
-    proxy = new KafkaConsumerProxy(kafkaConsumer, systemName, clientId, messageSink, samzaConsumerMetrics, metricName);
-    LOG.info("Created consumer proxy: " + proxy);
+    // Create the proxy to do the actual message reading.
+    String metricName = String.format("%s %s", systemName, clientId);
+    proxy = new KafkaConsumerProxy(kafkaConsumer, systemName, clientId, messageSink, metrics, metricName);
+    LOG.info("{}: Created KafkaConsumerProxy {} ", this, proxy );
 
-    LOG.info("Created SamzaKafkaSystemConsumer for system={}, clientId={}, metricName={}, KafkaConsumer={}", systemName,
-        clientId, metricName, this.kafkaConsumer.toString());
+    LOG.info("{}: Created KafkaSystemConsumer {}", this, kafkaConsumer);
   }
 
   /**
-   * create kafka consumer
+   * Create internal kafka consumer object, which will be used in the Proxy.
    * @param systemName system name for which we create the consumer
    * @param clientId client id to use int the kafka client
    * @param config config
-   * @return kafka consumer
+   * @return kafka consumer object
    */
   public static KafkaConsumer<byte[], byte[]> getKafkaConsumerImpl(String systemName, String clientId, Config config) {
 
-    Map<String, String> injectProps = new HashMap<>();
-
     // extract kafka client configs
     KafkaConsumerConfig consumerConfig =
-        KafkaConsumerConfig.getKafkaSystemConsumerConfig(config, systemName, clientId, injectProps);
+        KafkaConsumerConfig.getKafkaSystemConsumerConfig(config, systemName, clientId);
 
-    LOG.info("KafkaClient properties for systemName {}: {}", systemName, consumerConfig.originals());
+    LOG.info("{}:{} KafkaClient properties {}", systemName, clientId, consumerConfig);
 
-    return new KafkaConsumer<>(consumerConfig.originals());
+    return new KafkaConsumer(consumerConfig);
   }
 
   @Override
   public void start() {
     if (!started.compareAndSet(false, true)) {
-      LOG.warn("attempting to start the consumer for the second (or more) time.");
+      LOG.warn("{}: Attempting to start the consumer for the second (or more) time.", this);
       return;
     }
     if (stopped.get()) {
-      LOG.warn("attempting to start a stopped consumer");
+      LOG.warn("{}: Attempting to start a stopped consumer", this);
       return;
     }
     // initialize the subscriptions for all the registered TopicPartitions
@@ -145,58 +141,59 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
     setFetchThresholds();
 
     startConsumer();
-    LOG.info("consumer {} started", this);
+    LOG.info("{}: Consumer started", this);
   }
 
   private void startSubscription() {
     //subscribe to all the registered TopicPartitions
-    LOG.info("consumer {}, subscribes to {} ", this, topicPartitions2SSP.keySet());
+    LOG.info("{}: Consumer subscribes to {}", this, topicPartitionsToSSP.keySet());
     try {
       synchronized (kafkaConsumer) {
         // we are using assign (and not subscribe), so we need to specify both topic and partition
-        kafkaConsumer.assign(topicPartitions2SSP.keySet());
+        kafkaConsumer.assign(topicPartitionsToSSP.keySet());
       }
     } catch (Exception e) {
-      LOG.warn("startSubscription failed.", e);
+      LOG.warn("{}: Start subscription failed", this);
       throw new SamzaException(e);
     }
   }
 
-  /*
-   Set the offsets to start from.
-   Add the TopicPartitions to the proxy.
-   Start the proxy thread.
+  /**
+   * Set the offsets to start from.
+   * Register the TopicPartitions with the proxy.
+   * Start the proxy.
    */
   void startConsumer() {
-    //set the offset for each TopicPartition
-    if (topicPartitions2Offset.size() <= 0) {
-      LOG.warn("Consumer {} is not subscribed to any SSPs", this);
+    // set the offset for each TopicPartition
+    if (topicPartitionsToOffset.size() <= 0) {
+      LOG.warn("{}: Consumer is not subscribed to any SSPs", this);
     }
 
-    topicPartitions2Offset.forEach((tp, startingOffsetString) -> {
+    topicPartitionsToOffset.forEach((tp, startingOffsetString) -> {
       long startingOffset = Long.valueOf(startingOffsetString);
 
       try {
         synchronized (kafkaConsumer) {
-          // TODO in the future we may need to add special handling here for BEGIN/END_OFFSET
-          // this will call KafkaConsumer.seekToBegin/End()
           kafkaConsumer.seek(tp, startingOffset); // this value should already be the 'upcoming' value
         }
       } catch (Exception e) {
-        // all other exceptions - non recoverable
-        LOG.error("Got Exception while seeking to " + startingOffsetString + " for " + tp, e);
-        throw new SamzaException(e);
+        // all recoverable execptions are handled by the client.
+        // if we get here there is nothing left to do but bail out.
+        String msg = String.format("%s: Got Exception while seeking to %s for partition %s",
+            this, startingOffsetString, tp);
+        LOG.error(msg, e);
+        throw new SamzaException(msg, e);
       }
 
-      LOG.info("Changing consumer's starting offset for tp = " + tp + " to " + startingOffsetString);
+      LOG.info("{}: Changing consumer's starting offset for tp = %s to %s", this, tp, startingOffsetString);
 
       // add the partition to the proxy
-      proxy.addTopicPartition(topicPartitions2SSP.get(tp), startingOffset);
+      proxy.addTopicPartition(topicPartitionsToSSP.get(tp), startingOffset);
     });
 
     // start the proxy thread
     if (proxy != null && !proxy.isRunning()) {
-      LOG.info("Starting proxy: " + proxy);
+      LOG.info("{}: Starting proxy {}", this, proxy);
       proxy.start();
     }
   }
@@ -209,57 +206,59 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
     long fetchThreshold = FETCH_THRESHOLD;
     if (fetchThresholdOption.isDefined()) {
       fetchThreshold = Long.valueOf(fetchThresholdOption.get());
-      LOG.info("fetchThresholdOption is configured. fetchThreshold=" + fetchThreshold);
+      LOG.info("{}: fetchThresholdOption is configured. fetchThreshold={}", this, fetchThreshold);
     }
 
     Option<String> fetchThresholdBytesOption = kafkaConfig.getConsumerFetchThresholdBytes(systemName);
     long fetchThresholdBytes = FETCH_THRESHOLD_BYTES;
     if (fetchThresholdBytesOption.isDefined()) {
       fetchThresholdBytes = Long.valueOf(fetchThresholdBytesOption.get());
-      LOG.info("fetchThresholdBytesOption is configured. fetchThresholdBytes=" + fetchThresholdBytes);
+      LOG.info("{}: fetchThresholdBytesOption is configured. fetchThresholdBytes={}", this, fetchThresholdBytes);
     }
 
-    int numTPs = topicPartitions2SSP.size();
-    assert (numTPs == topicPartitions2Offset.size());
+    int numTPs = topicPartitionsToSSP.size();
+    if (numTPs == topicPartitionsToOffset.size()) {
+      throw new SamzaException("topicPartitionsToSSP.size() doesn't match topicPartitionsToOffset.size()");
+    }
 
-    LOG.info("fetchThresholdBytes = " + fetchThresholdBytes + "; fetchThreshold=" + fetchThreshold);
-    LOG.info("number of topicPartitions " + numTPs);
+    LOG.info("{}: fetchThresholdBytes = {}; fetchThreshold={}; partitions num={}",
+        this, fetchThresholdBytes, fetchThreshold, numTPs);
 
     if (numTPs > 0) {
       perPartitionFetchThreshold = fetchThreshold / numTPs;
-      LOG.info("perPartitionFetchThreshold=" + perPartitionFetchThreshold);
+      LOG.info("{}: perPartitionFetchThreshold={}", this, perPartitionFetchThreshold);
       if (fetchThresholdBytesEnabled) {
         // currently this feature cannot be enabled, because we do not have the size of the messages available.
         // messages get double buffered, hence divide by 2
         perPartitionFetchThresholdBytes = (fetchThresholdBytes / 2) / numTPs;
-        LOG.info("perPartitionFetchThresholdBytes is enabled. perPartitionFetchThresholdBytes="
-            + perPartitionFetchThresholdBytes);
+        LOG.info("{} :perPartitionFetchThresholdBytes is enabled. perPartitionFetchThresholdBytes={}",
+            this, perPartitionFetchThresholdBytes);
       }
     }
   }
 
   @Override
   public void stop() {
-    LOG.info("Stopping Samza kafkaConsumer " + this);
-
     if (!stopped.compareAndSet(false, true)) {
-      LOG.warn("attempting to stop stopped consumer.");
+      LOG.warn("{}: Attempting to stop stopped consumer.", this);
       return;
     }
 
-    // stop the proxy (with 5 minutes timeout)
+    LOG.info("{}: Stopping Samza kafkaConsumer ", this);
+
+    // stop the proxy (with 1 minute timeout)
     if (proxy != null) {
-      LOG.info("Stopping proxy " + proxy);
+      LOG.info("{}: Stopping proxy {}", this, proxy);
       proxy.stop(TimeUnit.SECONDS.toMillis(60));
     }
 
     try {
       synchronized (kafkaConsumer) {
-        LOG.info("Closing kafka consumer " + kafkaConsumer);
+        LOG.info("{}: Closing kafkaSystemConsumer {}", this, kafkaConsumer);
         kafkaConsumer.close();
       }
     } catch (Exception e) {
-      LOG.warn("failed to stop SamzaRawKafkaConsumer + " + this, e);
+      LOG.warn("{}: Failed to stop KafkaSystemConsumer.", this, e);
     }
   }
 
@@ -270,45 +269,45 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
   public void register(SystemStreamPartition systemStreamPartition, String offset) {
     if (started.get()) {
       String msg =
-          String.format("Trying to register partition after consumer has been started. sn=%s, ssp=%s", systemName,
-              systemStreamPartition);
-      LOG.error(msg);
+          String.format("%s: Trying to register partition after consumer has been started. ssp=%s",
+              this, systemStreamPartition);
       throw new SamzaException(msg);
     }
 
     if (!systemStreamPartition.getSystem().equals(systemName)) {
-      LOG.warn("ignoring SSP " + systemStreamPartition + ", because this consumer's system is " + systemName);
+      LOG.warn("{}: ignoring SSP {}, because this consumer's system doesn't match.", this, systemStreamPartition);
       return;
     }
+    LOG.info("{}: Registering ssp = {} with offset {}", this, systemStreamPartition, offset);
+
     super.register(systemStreamPartition, offset);
 
     TopicPartition tp = toTopicPartition(systemStreamPartition);
 
-    topicPartitions2SSP.put(tp, systemStreamPartition);
+    topicPartitionsToSSP.put(tp, systemStreamPartition);
 
-    LOG.info("Registering ssp = " + systemStreamPartition + " with offset " + offset);
 
-    String existingOffset = topicPartitions2Offset.get(tp);
+    String existingOffset = topicPartitionsToOffset.get(tp);
     // register the older (of the two) offset in the consumer, to guarantee we do not miss any messages.
     if (existingOffset == null || compareOffsets(existingOffset, offset) > 0) {
-      topicPartitions2Offset.put(tp, offset);
+      topicPartitionsToOffset.put(tp, offset);
     }
 
-    samzaConsumerMetrics.registerTopicAndPartition(toTopicAndPartition(tp));
+    metrics.registerTopicAndPartition(toTopicAndPartition(tp));
   }
 
   /**
    * Compare two String offsets.
-   * Note. There is a method in KafkaAdmin that does that, but that would require instantiation of systemadmin for each consumer.
+   * Note. There is a method in KafkaSystemAdmin that does that, but that would require instantiation of systemadmin for each consumer.
    * @return see {@link Long#compareTo(Long)}
    */
-  public static int compareOffsets(String offset1, String offset2) {
+  private static int compareOffsets(String offset1, String offset2) {
     return Long.valueOf(offset1).compareTo(Long.valueOf(offset2));
   }
 
   @Override
   public String toString() {
-    return systemName + "/" + clientId + "/" + super.toString();
+    return String.format("%s:%s", systemName, clientId);
   }
 
   @Override
@@ -318,17 +317,11 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
     // check if the proxy is running
     if (!proxy.isRunning()) {
       stop();
-      if (proxy.getFailureCause() != null) {
-        String message = "KafkaConsumerProxy has stopped";
-        throw new SamzaException(message, proxy.getFailureCause());
-      } else {
-        LOG.warn("Failure cause is not populated for KafkaConsumerProxy");
-        throw new SamzaException("KafkaConsumerProxy has stopped");
-      }
+      String message = String.format("%s: KafkaConsumerProxy has stopped.", this);
+      throw new SamzaException(message, proxy.getFailureCause());
     }
 
-    Map<SystemStreamPartition, List<IncomingMessageEnvelope>> res = super.poll(systemStreamPartitions, timeout);
-    return res;
+    return super.poll(systemStreamPartitions, timeout);
   }
 
   /**
@@ -353,9 +346,6 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
     return systemName;
   }
 
-  ////////////////////////////////////
-  // inner class for the message sink
-  ////////////////////////////////////
   public class KafkaConsumerMessageSink {
 
     public void setIsAtHighWatermark(SystemStreamPartition ssp, boolean isAtHighWatermark) {
@@ -363,8 +353,8 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
     }
 
     boolean needsMoreMessages(SystemStreamPartition ssp) {
-        LOG.debug("needsMoreMessages from following SSP: {}. fetchLimitByBytes enabled={}; messagesSizeInQueue={};"
-                + "(limit={}); messagesNumInQueue={}(limit={};", ssp, fetchThresholdBytesEnabled,
+        LOG.debug("{}: needsMoreMessages from following SSP: {}. fetchLimitByBytes enabled={}; messagesSizeInQueue={};"
+                + "(limit={}); messagesNumInQueue={}(limit={};", this, ssp, fetchThresholdBytesEnabled,
             getMessagesSizeInQueue(ssp), perPartitionFetchThresholdBytes, getNumMessagesInQueue(ssp),
             perPartitionFetchThreshold);
 
@@ -376,16 +366,15 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
     }
 
     void addMessage(SystemStreamPartition ssp, IncomingMessageEnvelope envelope) {
-      LOG.trace("Incoming message ssp = {}: envelope = {}.", ssp, envelope);
+      LOG.trace("{}: Incoming message ssp = {}: envelope = {}.", this, ssp, envelope);
 
       try {
         put(ssp, envelope);
       } catch (InterruptedException e) {
         throw new SamzaException(
-            String.format("Interrupted while trying to add message with offset %s for ssp %s", envelope.getOffset(),
-                ssp));
+            String.format("%s: Consumer was interrupted while trying to add message with offset %s for ssp %s",
+                this, envelope.getOffset(), ssp));
       }
     }
-  }  // end of KafkaMessageSink class
-  ///////////////////////////////////////////////////////////////////////////
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/5397a34e/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala
index 7dce261..c4552e6 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala
@@ -50,7 +50,7 @@ class KafkaSystemConsumerMetrics(val systemName: String = "unknown", val registr
     clientBytesRead.put(clientName, newCounter("%s-bytes-read" format clientName))
     clientReads.put((clientName), newCounter("%s-messages-read" format clientName))
     clientSkippedFetchRequests.put((clientName), newCounter("%s-skipped-fetch-requests" format clientName))
-    topicPartitions.put(clientName, newGauge("%s-topic-partitions" format clientName, 0))
+    topicPartitions.put(clientName, newGauge("%s-registered-topic-partitions" format clientName, 0))
   }
 
   // java friendlier interfaces

http://git-wip-us.apache.org/repos/asf/samza/blob/5397a34e/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
index 5342b08..deaee56 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
@@ -22,7 +22,7 @@ package org.apache.samza.system.kafka
 import java.util.Properties
 
 import kafka.utils.ZkUtils
-import org.apache.kafka.clients.consumer.{KafkaConsumer, KafkaConsumerConfig}
+import org.apache.kafka.clients.consumer.KafkaConsumer
 import org.apache.kafka.clients.producer.KafkaProducer
 import org.apache.samza.SamzaException
 import org.apache.samza.config.ApplicationConfig.ApplicationMode
@@ -30,7 +30,7 @@ import org.apache.samza.config.KafkaConfig.Config2Kafka
 import org.apache.samza.config.StorageConfig._
 import org.apache.samza.config.SystemConfig.Config2System
 import org.apache.samza.config.TaskConfig.Config2Task
-import org.apache.samza.config.{ApplicationConfig, Config, KafkaConfig, StreamConfig}
+import org.apache.samza.config._
 import org.apache.samza.metrics.MetricsRegistry
 import org.apache.samza.system.{SystemAdmin, SystemConsumer, SystemFactory, SystemProducer}
 import org.apache.samza.util._

http://git-wip-us.apache.org/repos/asf/samza/blob/5397a34e/samza-kafka/src/test/java/org/apache/kafka/clients/consumer/TestKafkaConsumerConfig.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/java/org/apache/kafka/clients/consumer/TestKafkaConsumerConfig.java b/samza-kafka/src/test/java/org/apache/kafka/clients/consumer/TestKafkaConsumerConfig.java
deleted file mode 100644
index 264098b..0000000
--- a/samza-kafka/src/test/java/org/apache/kafka/clients/consumer/TestKafkaConsumerConfig.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.clients.consumer;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.kafka.common.serialization.ByteArrayDeserializer;
-import org.apache.samza.SamzaException;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.MapConfig;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-
-public class TestKafkaConsumerConfig {
-  private final Map<String, String> props = new HashMap<>();
-  public final static String SYSTEM_NAME = "testSystem";
-  public final static String KAFKA_PRODUCER_PROPERTY_PREFIX = "systems." + SYSTEM_NAME + ".producer.";
-  public final static String KAFKA_CONSUMER_PROPERTY_PREFIX = "systems." + SYSTEM_NAME + ".consumer.";
-  private final static String CLIENT_ID = "clientId";
-
-  @Before
-  public void setProps() {
-
-  }
-
-  @Test
-  public void testDefaultsAndOverrides() {
-
-    Map<String, String> overrides = new HashMap<>();
-    overrides.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); // should be ignored
-    overrides.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "Ignore"); // should be ignored
-    overrides.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100"); // should NOT be ignored
-
-    // if KAFKA_CONSUMER_PROPERTY_PREFIX is set, then PRODUCER should be ignored
-    props.put(KAFKA_PRODUCER_PROPERTY_PREFIX + "bootstrap.servers", "ignroeThis:9092");
-    props.put(KAFKA_CONSUMER_PROPERTY_PREFIX + "bootstrap.servers", "useThis:9092");
-
-    // should be overridden
-    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "true"); //ignore
-    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1000"); // ignore
-
-
-    // should be overridden
-    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "200");
-
-    Config config = new MapConfig(props);
-    KafkaConsumerConfig kafkaConsumerConfig = KafkaConsumerConfig.getKafkaSystemConsumerConfig(
-        config, SYSTEM_NAME, CLIENT_ID, overrides);
-
-    Assert.assertEquals(kafkaConsumerConfig.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG), false);
-
-    Assert.assertEquals(
-        kafkaConsumerConfig.getInt(ConsumerConfig.MAX_POLL_RECORDS_CONFIG),
-        Integer.valueOf(KafkaConsumerConfig.DEFAULT_KAFKA_CONSUMER_MAX_POLL_RECORDS));
-
-    Assert.assertEquals(
-        kafkaConsumerConfig.getList(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG).get(0),
-        RangeAssignor.class.getName());
-
-    Assert.assertEquals(
-        kafkaConsumerConfig.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG).get(0),
-        "useThis:9092");
-    Assert.assertEquals(
-        kafkaConsumerConfig.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG).longValue(),
-        100);
-
-    Assert.assertEquals(
-        kafkaConsumerConfig.getClass(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG),
-        ByteArrayDeserializer.class);
-
-    Assert.assertEquals(
-        kafkaConsumerConfig.getClass(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG),
-        ByteArrayDeserializer.class);
-
-    Assert.assertEquals(
-        kafkaConsumerConfig.getString(ConsumerConfig.CLIENT_ID_CONFIG),
-        CLIENT_ID);
-
-    Assert.assertEquals(
-        kafkaConsumerConfig.getString(ConsumerConfig.GROUP_ID_CONFIG),
-        KafkaConsumerConfig.getConsumerGroupId(config));
-  }
-
-  @Test
-  // test stuff that should not be overridden
-  public void testNotOverride() {
-
-    // if KAFKA_CONSUMER_PROPERTY_PREFIX is not set, then PRODUCER should be used
-    props.put(KAFKA_PRODUCER_PROPERTY_PREFIX + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "useThis:9092");
-    props.put(KAFKA_CONSUMER_PROPERTY_PREFIX + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, TestKafkaConsumerConfig.class.getName());
-    props.put(KAFKA_CONSUMER_PROPERTY_PREFIX + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, TestKafkaConsumerConfig.class.getName());
-
-
-    Config config = new MapConfig(props);
-    KafkaConsumerConfig kafkaConsumerConfig = KafkaConsumerConfig.getKafkaSystemConsumerConfig(
-        config, SYSTEM_NAME, CLIENT_ID, Collections.emptyMap());
-
-    Assert.assertEquals(
-        kafkaConsumerConfig.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG).get(0),
-        "useThis:9092");
-
-    Assert.assertEquals(
-        kafkaConsumerConfig.getClass(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG),
-        TestKafkaConsumerConfig.class);
-
-    Assert.assertEquals(
-        kafkaConsumerConfig.getClass(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG),
-        TestKafkaConsumerConfig.class);
-  }
-
-
-
-  @Test(expected = SamzaException.class)
-  public void testNoBootstrapServers() {
-    KafkaConsumerConfig kafkaConsumerConfig = KafkaConsumerConfig.getKafkaSystemConsumerConfig(
-        new MapConfig(Collections.emptyMap()), SYSTEM_NAME, "clientId", Collections.emptyMap());
-
-    Assert.fail("didn't get exception for the missing config:" + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG);
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/5397a34e/samza-kafka/src/test/java/org/apache/samza/config/TestKafkaConsumerConfig.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/java/org/apache/samza/config/TestKafkaConsumerConfig.java b/samza-kafka/src/test/java/org/apache/samza/config/TestKafkaConsumerConfig.java
new file mode 100644
index 0000000..719ea22
--- /dev/null
+++ b/samza-kafka/src/test/java/org/apache/samza/config/TestKafkaConsumerConfig.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.samza.config;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.RangeAssignor;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.samza.SamzaException;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+
+public class TestKafkaConsumerConfig {
+  private final Map<String, String> props = new HashMap<>();
+  public final static String SYSTEM_NAME = "testSystem";
+  public final static String KAFKA_PRODUCER_PROPERTY_PREFIX = "systems." + SYSTEM_NAME + ".producer.";
+  public final static String KAFKA_CONSUMER_PROPERTY_PREFIX = "systems." + SYSTEM_NAME + ".consumer.";
+  private final static String CLIENT_ID = "clientId";
+
+  @Before
+  public void setProps() {
+
+  }
+
+  @Test
+  public void testDefaults() {
+
+    props.put(KAFKA_CONSUMER_PROPERTY_PREFIX + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); // should be ignored
+    props.put(KAFKA_CONSUMER_PROPERTY_PREFIX + ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "Ignore"); // should be ignored
+    props.put(KAFKA_CONSUMER_PROPERTY_PREFIX + ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100"); // should NOT be ignored
+
+    // if KAFKA_CONSUMER_PROPERTY_PREFIX is set, then PRODUCER should be ignored
+    props.put(KAFKA_PRODUCER_PROPERTY_PREFIX + "bootstrap.servers", "ignroeThis:9092");
+    props.put(KAFKA_CONSUMER_PROPERTY_PREFIX + "bootstrap.servers", "useThis:9092");
+
+    Config config = new MapConfig(props);
+    KafkaConsumerConfig kafkaConsumerConfig = KafkaConsumerConfig.getKafkaSystemConsumerConfig(
+        config, SYSTEM_NAME, CLIENT_ID);
+
+    Assert.assertEquals("false", kafkaConsumerConfig.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG));
+
+    Assert.assertEquals(
+        KafkaConsumerConfig.DEFAULT_KAFKA_CONSUMER_MAX_POLL_RECORDS,
+        kafkaConsumerConfig.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG));
+
+    Assert.assertEquals(
+        RangeAssignor.class.getName(),
+        kafkaConsumerConfig.get(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG));
+
+    Assert.assertEquals(
+        "useThis:9092",
+        kafkaConsumerConfig.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
+    Assert.assertEquals(
+        "100",
+        kafkaConsumerConfig.get(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG));
+
+    Assert.assertEquals(
+        ByteArrayDeserializer.class.getName(),
+        kafkaConsumerConfig.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG));
+
+    Assert.assertEquals(
+        ByteArrayDeserializer.class.getName(),
+        kafkaConsumerConfig.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG) );
+
+    Assert.assertEquals(
+        CLIENT_ID,
+        kafkaConsumerConfig.get(ConsumerConfig.CLIENT_ID_CONFIG));
+
+    Assert.assertEquals(
+        KafkaConsumerConfig.getConsumerGroupId(config),
+        kafkaConsumerConfig.get(ConsumerConfig.GROUP_ID_CONFIG));
+  }
+
+  @Test
+  // test stuff that should not be overridden
+  public void testNotOverride() {
+
+    // if KAFKA_CONSUMER_PROPERTY_PREFIX is not set, then PRODUCER should be used
+    props.put(KAFKA_PRODUCER_PROPERTY_PREFIX + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "useThis:9092");
+    props.put(KAFKA_CONSUMER_PROPERTY_PREFIX + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, TestKafkaConsumerConfig.class.getName());
+    props.put(KAFKA_CONSUMER_PROPERTY_PREFIX + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, TestKafkaConsumerConfig.class.getName());
+
+
+    Config config = new MapConfig(props);
+    KafkaConsumerConfig kafkaConsumerConfig = KafkaConsumerConfig.getKafkaSystemConsumerConfig(
+        config, SYSTEM_NAME, CLIENT_ID);
+
+    Assert.assertEquals(
+        "useThis:9092",
+        kafkaConsumerConfig.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
+
+    Assert.assertEquals(
+        TestKafkaConsumerConfig.class.getName(),
+        kafkaConsumerConfig.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG));
+
+    Assert.assertEquals(
+        TestKafkaConsumerConfig.class.getName(),
+        kafkaConsumerConfig.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG));
+  }
+
+  @Test
+  public void testGetConsumerClientId() {
+    Map<String, String> map = new HashMap<>();
+
+    map.put(JobConfig.JOB_NAME(), "jobName");
+    map.put(JobConfig.JOB_ID(), "jobId");
+    String result =  KafkaConsumerConfig.getConsumerClientId("consumer", new MapConfig(map));
+    Assert.assertEquals("consumer-jobName-jobId", result);
+
+    result =  KafkaConsumerConfig.getConsumerClientId("consumer-", new MapConfig(map));
+    Assert.assertEquals("consumer_-jobName-jobId", result);
+
+    result =  KafkaConsumerConfig.getConsumerClientId("super-duper-consumer", new MapConfig(map));
+    Assert.assertEquals("super_duper_consumer-jobName-jobId", result);
+
+    map.put(JobConfig.JOB_NAME(), " very important!job");
+    result =  KafkaConsumerConfig.getConsumerClientId("consumer", new MapConfig(map));
+    Assert.assertEquals("consumer-_very_important_job-jobId", result);
+
+    map.put(JobConfig.JOB_ID(), "number-#3");
+    result =  KafkaConsumerConfig.getConsumerClientId("consumer", new MapConfig(map));
+    Assert.assertEquals("consumer-_very_important_job-number__3", result);
+  }
+
+
+
+  @Test(expected = SamzaException.class)
+  public void testNoBootstrapServers() {
+    KafkaConsumerConfig kafkaConsumerConfig = KafkaConsumerConfig.getKafkaSystemConsumerConfig(
+        new MapConfig(Collections.emptyMap()), SYSTEM_NAME, "clientId");
+
+    Assert.fail("didn't get exception for the missing config:" + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/5397a34e/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java
index d90bc35..9e8ff44 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java
+++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java
@@ -27,7 +27,7 @@ import java.util.Map;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.consumer.KafkaConsumerConfig;
+import org.apache.samza.config.KafkaConsumerConfig;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.samza.Partition;
 import org.apache.samza.config.Config;
@@ -67,8 +67,8 @@ public class TestKafkaSystemConsumer {
 
     Config config = new MapConfig(map);
     KafkaConsumerConfig consumerConfig =
-        KafkaConsumerConfig.getKafkaSystemConsumerConfig(config, TEST_SYSTEM, TEST_CLIENT_ID, Collections.emptyMap());
-    final KafkaConsumer<byte[], byte[]> kafkaConsumer = new MockKafkaConsumer(consumerConfig.originals());
+        KafkaConsumerConfig.getKafkaSystemConsumerConfig(config, TEST_SYSTEM, TEST_CLIENT_ID);
+    final KafkaConsumer<byte[], byte[]> kafkaConsumer = new MockKafkaConsumer(consumerConfig);
 
     MockKafkaSystmeCosumer newKafkaSystemConsumer =
         new MockKafkaSystmeCosumer(kafkaConsumer, TEST_SYSTEM, config, TEST_CLIENT_ID,
@@ -116,9 +116,9 @@ public class TestKafkaSystemConsumer {
     consumer.register(ssp1, "3");
     consumer.register(ssp2, "0");
 
-    assertEquals("0", consumer.topicPartitions2Offset.get(KafkaSystemConsumer.toTopicPartition(ssp0)));
-    assertEquals("2", consumer.topicPartitions2Offset.get(KafkaSystemConsumer.toTopicPartition(ssp1)));
-    assertEquals("0", consumer.topicPartitions2Offset.get(KafkaSystemConsumer.toTopicPartition(ssp2)));
+    assertEquals("0", consumer.topicPartitionsToOffset.get(KafkaSystemConsumer.toTopicPartition(ssp0)));
+    assertEquals("2", consumer.topicPartitionsToOffset.get(KafkaSystemConsumer.toTopicPartition(ssp1)));
+    assertEquals("0", consumer.topicPartitionsToOffset.get(KafkaSystemConsumer.toTopicPartition(ssp2)));
   }
 
   @Test