You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by bo...@apache.org on 2018/09/26 00:23:26 UTC

[27/29] samza git commit: more review comments

more review comments


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/190a3999
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/190a3999
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/190a3999

Branch: refs/heads/NewKafkaSystemConsumer
Commit: 190a39990a9281511e07876636b6c8784337d35a
Parents: 93ca950
Author: Boris S <bo...@apache.org>
Authored: Tue Sep 25 15:55:15 2018 -0700
Committer: Boris S <bo...@apache.org>
Committed: Tue Sep 25 15:55:15 2018 -0700

----------------------------------------------------------------------
 .../samza/config/KafkaConsumerConfig.java       |  47 ++-
 .../samza/system/kafka/KafkaConsumerProxy.java  | 345 +++++++++----------
 .../samza/system/kafka/KafkaSystemConsumer.java |  49 ++-
 .../kafka/KafkaSystemConsumerMetrics.scala      |   2 +-
 .../samza/config/TestKafkaConsumerConfig.java   |  19 +-
 5 files changed, 235 insertions(+), 227 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/190a3999/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConsumerConfig.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConsumerConfig.java b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConsumerConfig.java
index 7d2408b..3fa66e5 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConsumerConfig.java
+++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConsumerConfig.java
@@ -31,6 +31,7 @@ import org.apache.samza.SamzaException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.Option;
+import scala.runtime.AbstractFunction0;
 
 
 /**
@@ -40,9 +41,9 @@ public class KafkaConsumerConfig extends HashMap<String, Object> {
 
   public static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerConfig.class);
 
-  private static final String PRODUCER_CLIENT_ID_PREFIX = "kafka-producer";
-  private static final String CONSUMER_CLIENT_ID_PREFIX = "kafka-consumer";
-  private static final String ADMIN_CLIENT_ID_PREFIX = "samza-admin";
+  static final String PRODUCER_CLIENT_ID_PREFIX = "kafka-producer";
+  static final String CONSUMER_CLIENT_ID_PREFIX = "kafka-consumer";
+  static final String ADMIN_CLIENT_ID_PREFIX = "samza-admin";
 
   /*
    * By default, KafkaConsumer will fetch some big number of available messages for all the partitions.
@@ -55,12 +56,12 @@ public class KafkaConsumerConfig extends HashMap<String, Object> {
   }
 
   /**
-   * This is a help method to create the configs for use in Kafka consumer.
+   * Helper method to create configs for use in Kafka consumer.
    * The values are based on the "consumer" subset of the configs provided by the app and Samza overrides.
    *
-   * @param config - config provided by the app.
-   * @param systemName - system name for which the consumer is configured.
-   * @param clientId - client id to be used in the Kafka consumer.
+   * @param config config provided by the app.
+   * @param systemName system name to get the consumer configuration for.
+   * @param clientId client id to be used in the Kafka consumer.
    * @return KafkaConsumerConfig
    */
   public static KafkaConsumerConfig getKafkaSystemConsumerConfig(Config config, String systemName, String clientId) {
@@ -85,7 +86,7 @@ public class KafkaConsumerConfig extends HashMap<String, Object> {
     consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
         getAutoOffsetResetValue((String) consumerProps.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)));
 
-    // make sure bootstrap configs are in, if not - get them from the producer
+    // if consumer bootstrap servers are not configured, get them from the producer configs
     if (!subConf.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)) {
       String bootstrapServers =
           config.get(String.format("systems.%s.producer.%s", systemName, ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
@@ -119,10 +120,19 @@ public class KafkaConsumerConfig extends HashMap<String, Object> {
   // group id should be unique per job
   static String getConsumerGroupId(Config config) {
     JobConfig jobConfig = new JobConfig(config);
-    Option<String> jobIdOption = jobConfig.getJobId();
-    Option<String> jobNameOption = jobConfig.getName();
-    return (jobNameOption.isDefined() ? jobNameOption.get() : "undefined_job_name") + "-" + (jobIdOption.isDefined()
-        ? jobIdOption.get() : "undefined_job_id");
+    Option jobNameOption = jobConfig.getName();
+    if (jobNameOption.isEmpty()) {
+      throw new ConfigException("Missing job name");
+    }
+    String jobName = (String) jobNameOption.get();
+
+    Option jobIdOption = jobConfig.getJobId();
+    String jobId = "1";
+    if (! jobIdOption.isEmpty()) {
+      jobId = (String) jobIdOption.get();
+    }
+
+    return String.format("%s-%s", jobName, jobId);
   }
 
   // client id should be unique per job
@@ -139,11 +149,18 @@ public class KafkaConsumerConfig extends HashMap<String, Object> {
   }
 
   static String getConsumerClientId(String id, Config config) {
-    if (config.get(JobConfig.JOB_NAME()) == null) {
+    JobConfig jobConfig = new JobConfig(config);
+    Option jobNameOption = jobConfig.getName();
+    if (jobNameOption.isEmpty()) {
       throw new ConfigException("Missing job name");
     }
-    String jobName = config.get(JobConfig.JOB_NAME());
-    String jobId = (config.get(JobConfig.JOB_ID()) != null) ? config.get(JobConfig.JOB_ID()) : "1";
+    String jobName = (String) jobNameOption.get();
+
+    Option jobIdOption = jobConfig.getJobId();
+    String jobId = "1";
+    if (! jobIdOption.isEmpty()) {
+      jobId = (String) jobIdOption.get();
+    }
 
     return String.format("%s-%s-%s", id.replaceAll("\\W", "_"), jobName.replaceAll("\\W", "_"),
         jobId.replaceAll("\\W", "_"));

http://git-wip-us.apache.org/repos/asf/samza/blob/190a3999/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
index d2f7096..04071c1 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
@@ -30,13 +30,10 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
-import javax.print.DocFlavor;
-import kafka.common.KafkaException;
 import kafka.common.TopicAndPartition;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.InvalidOffsetException;
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.TopicPartition;
@@ -53,21 +50,21 @@ import org.slf4j.LoggerFactory;
  * This class is not thread safe. There will be only one instance of this class per KafkaSystemConsumer object.
  * We still need some synchronization around kafkaConsumer. See pollConsumer() method for details.
  */
-/*package private */class KafkaConsumerProxy<K, V> {
+class KafkaConsumerProxy<K, V> {
   private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerProxy.class);
 
   private static final int SLEEP_MS_WHILE_NO_TOPIC_PARTITION = 100;
 
-  /* package private */ final Thread consumerPollThread;
+  final Thread consumerPollThread;
   private final Consumer<K, V> kafkaConsumer;
   private final KafkaSystemConsumer.KafkaConsumerMessageSink sink;
   private final KafkaSystemConsumerMetrics kafkaConsumerMetrics;
   private final String metricName;
   private final String systemName;
   private final String clientId;
-  private final Map<TopicPartition, SystemStreamPartition> topicPartitions2SSP = new HashMap<>();
+  private final Map<TopicPartition, SystemStreamPartition> topicPartitionToSSP = new HashMap<>();
   private final Map<SystemStreamPartition, MetricName> perPartitionMetrics = new HashMap<>();
-  // list of all the SSPs we poll from, with their next offsets correspondingly.
+  // list of all the SSPs we poll from, with their next(most recently read + 1) offsets correspondingly.
   private final Map<SystemStreamPartition, Long> nextOffsets = new ConcurrentHashMap<>();
   // lags behind the high water mark, as reported by the Kafka consumer.
   private final Map<SystemStreamPartition, Long> latestLags = new HashMap<>();
@@ -76,7 +73,6 @@ import org.slf4j.LoggerFactory;
   private volatile Throwable failureCause = null;
   private final CountDownLatch consumerPollThreadStartLatch = new CountDownLatch(1);
 
-  // package private constructor
   KafkaConsumerProxy(Consumer<K, V> kafkaConsumer, String systemName, String clientId,
       KafkaSystemConsumer.KafkaConsumerMessageSink messageSink, KafkaSystemConsumerMetrics samzaConsumerMetrics,
       String metricName) {
@@ -96,14 +92,46 @@ import org.slf4j.LoggerFactory;
         "Samza KafkaConsumerProxy Poll " + consumerPollThread.getName() + " - " + systemName);
   }
 
-  @Override
-  public String toString() {
-    return String.format("consumerProxy-%s-%s", systemName, clientId);
+  /**
+   * Add new partition to the list of polled partitions.
+   * Bust only be called before {@link KafkaConsumerProxy#start} is called..
+   */
+  public void addTopicPartition(SystemStreamPartition ssp, long nextOffset) {
+    LOG.info(String.format("Adding new topicPartition %s with offset %s to queue for consumer %s", ssp, nextOffset,
+        this));
+    topicPartitionToSSP.put(KafkaSystemConsumer.toTopicPartition(ssp), ssp); //registered SSPs
+
+    // this is already vetted offset so there is no need to validate it
+    nextOffsets.put(ssp, nextOffset);
+
+    kafkaConsumerMetrics.setNumTopicPartitions(metricName, nextOffsets.size());
+  }
+
+  /**
+   * Stop this KafkaConsumerProxy and wait for at most {@code timeoutMs}.
+   * @param timeoutMs maximum time to wait to stop this KafkaConsumerProxy
+   */
+  public void stop(long timeoutMs) {
+    LOG.info("Shutting down KafkaConsumerProxy poll thread {} for {}", consumerPollThread.getName(), this);
+
+    isRunning = false;
+    try {
+      consumerPollThread.join(timeoutMs/2);
+      // join() may timeout
+      // in this case we should interrupt it and wait again
+      if (consumerPollThread.isAlive()) {
+        consumerPollThread.interrupt();
+        consumerPollThread.join(timeoutMs/2);
+      }
+    } catch (InterruptedException e) {
+      LOG.warn("Join in KafkaConsumerProxy has failed", e);
+      consumerPollThread.interrupt();
+    }
   }
 
   public void start() {
     if (!consumerPollThread.isAlive()) {
-      LOG.info("Starting KafkaConsumerProxy polling thread for system " + systemName + " " + this.toString());
+      LOG.info("Starting KafkaConsumerProxy polling thread for " + this.toString());
 
       consumerPollThread.start();
 
@@ -112,70 +140,124 @@ import org.slf4j.LoggerFactory;
         try {
           consumerPollThreadStartLatch.await(3000, TimeUnit.MILLISECONDS);
         } catch (InterruptedException e) {
-          LOG.info("Got InterruptedException", e);
+          LOG.info("Ignoring InterruptedException while waiting for consumer poll thread to start.", e);
         }
       }
     } else {
       LOG.warn("Tried to start an already started KafkaConsumerProxy (%s). Ignoring.", this.toString());
     }
 
-    if (topicPartitions2SSP.size() == 0) {
-      String msg = String.format("Cannot start empty set of TopicPartitions for system %s, clientid %s",
-          systemName, clientId);
+    if (topicPartitionToSSP.size() == 0) {
+      String msg = String.format("Cannot start KafkaConsumerProxy without any registered TopicPartitions for %s", systemName);
       LOG.error(msg);
       throw new SamzaException(msg);
     }
   }
 
-  /**
-   * Stop the thread and wait for it to stop.
-   * @param timeoutMs how long to wait in join
-   */
-  public void stop(long timeoutMs) {
-    LOG.info("Shutting down KafkaConsumerProxy poll thread:" + consumerPollThread.getName());
+  boolean isRunning() {
+    return isRunning;
+  }
 
-    isRunning = false;
-    try {
-      consumerPollThread.join(timeoutMs);
-      // join returns event if the thread didn't finish
-      // in this case we should interrupt it and wait again
-      if (consumerPollThread.isAlive()) {
-        consumerPollThread.interrupt();
-        consumerPollThread.join(timeoutMs);
+  Throwable getFailureCause() {
+    return failureCause;
+  }
+
+  private void initializeLags() {
+    // This is expensive, so only do it once at the beginning. After the first poll, we can rely on metrics for lag.
+    Map<TopicPartition, Long> endOffsets = kafkaConsumer.endOffsets(topicPartitionToSSP.keySet());
+    endOffsets.forEach((tp, offset) -> {
+      SystemStreamPartition ssp = topicPartitionToSSP.get(tp);
+      long startingOffset = nextOffsets.get(ssp);
+      // End offsets are the offset of the newest message + 1
+      // If the message we are about to consume is < end offset, we are starting with a lag.
+      long initialLag = endOffsets.get(tp) - startingOffset;
+
+      LOG.info("Initial lag for SSP {} is {} (end={}, startOffset={})", ssp, initialLag, endOffsets.get(tp), startingOffset);
+      latestLags.put(ssp, initialLag);
+      sink.setIsAtHighWatermark(ssp, initialLag == 0);
+    });
+
+    // initialize lag metrics
+    refreshLagMetrics();
+  }
+
+  // creates a separate thread for getting the messages.
+  private Runnable createProxyThreadRunnable() {
+    Runnable runnable = () -> {
+      isRunning = true;
+
+      try {
+        consumerPollThreadStartLatch.countDown();
+        LOG.info("Starting consumer poll thread {} for system {}", consumerPollThread.getName(), systemName);
+        initializeLags();
+        while (isRunning) {
+          fetchMessages();
+        }
+      } catch (Throwable throwable) {
+        LOG.error(String.format("Error in KafkaConsumerProxy poll thread for system: %s.", systemName), throwable);
+        // KafkaSystemConsumer uses the failureCause to propagate the throwable to the container
+        failureCause = throwable;
+        isRunning = false;
       }
-    } catch (InterruptedException e) {
-      LOG.warn("Join in KafkaConsumerProxy has failed", e);
-      consumerPollThread.interrupt();
-    }
+
+      if (!isRunning) {
+        LOG.info("KafkaConsumerProxy for system {} has stopped.", systemName);
+      }
+    };
+
+    return runnable;
   }
 
-  /**
-   * Add new partition to the list of polled partitions.
-   * This method should be called only at the beginning, before the thread is started.
-   */
-  public void addTopicPartition(SystemStreamPartition ssp, long nextOffset) {
-    LOG.info(String.format("Adding new topic and partition %s, offset = %s to queue for consumer %s", ssp, nextOffset,
-        this));
-    topicPartitions2SSP.put(KafkaSystemConsumer.toTopicPartition(ssp), ssp); //registered SSPs
+  private void fetchMessages() {
+    Set<SystemStreamPartition> sspsToFetch = new HashSet<>();
+    for (SystemStreamPartition ssp : nextOffsets.keySet()) {
+      if (sink.needsMoreMessages(ssp)) {
+        sspsToFetch.add(ssp);
+      }
+    }
+    LOG.debug("pollConsumer for {} SSPs: {}", sspsToFetch.size(), sspsToFetch);
+    if (!sspsToFetch.isEmpty()) {
+      kafkaConsumerMetrics.incClientReads(metricName);
 
-    // this is already vetted offset so there is no need to validate it
-    LOG.info(String.format("Got offset %s for new topic and partition %s.", nextOffset, ssp));
+      Map<SystemStreamPartition, List<IncomingMessageEnvelope>> response;
 
-    nextOffsets.put(ssp, nextOffset);
+      response = pollConsumer(sspsToFetch, 500L);
+
+      // move the responses into the queue
+      for (Map.Entry<SystemStreamPartition, List<IncomingMessageEnvelope>> e : response.entrySet()) {
+        List<IncomingMessageEnvelope> envelopes = e.getValue();
+        if (envelopes != null) {
+          moveMessagesToTheirQueue(e.getKey(), envelopes);
+        }
+      }
+
+      populateCurrentLags(sspsToFetch); // find current lags for for each SSP
+    } else { // nothing to read
+
+      LOG.debug("No topic/partitions need to be fetched for system {} right now. Sleeping {}ms.", systemName,
+          SLEEP_MS_WHILE_NO_TOPIC_PARTITION);
+
+      kafkaConsumerMetrics.incClientSkippedFetchRequests(metricName);
 
-    kafkaConsumerMetrics.setTopicPartitionValue(metricName, nextOffsets.size());
+      try {
+        Thread.sleep(SLEEP_MS_WHILE_NO_TOPIC_PARTITION);
+      } catch (InterruptedException e) {
+        LOG.warn("Sleep in fetchMessages was interrupted");
+      }
+    }
+    refreshLagMetrics();
   }
 
   // the actual polling of the messages from kafka
   private Map<SystemStreamPartition, List<IncomingMessageEnvelope>> pollConsumer(
-      Set<SystemStreamPartition> systemStreamPartitions, long timeout) {
+      Set<SystemStreamPartition> systemStreamPartitions, long timeoutMs) {
 
     // Since we need to poll only from some subset of TopicPartitions (passed as the argument),
     // we need to pause the rest.
     List<TopicPartition> topicPartitionsToPause = new ArrayList<>();
     List<TopicPartition> topicPartitionsToPoll = new ArrayList<>();
 
-    for (Map.Entry<TopicPartition, SystemStreamPartition> e : topicPartitions2SSP.entrySet()) {
+    for (Map.Entry<TopicPartition, SystemStreamPartition> e : topicPartitionToSSP.entrySet()) {
       TopicPartition tp = e.getKey();
       SystemStreamPartition ssp = e.getValue();
       if (systemStreamPartitions.contains(ssp)) {
@@ -186,21 +268,18 @@ import org.slf4j.LoggerFactory;
     }
 
     ConsumerRecords<K, V> records;
-
     try {
       // Synchronize, in case the consumer is used in some other thread (metadata or something else)
       synchronized (kafkaConsumer) {
         // Since we are not polling from ALL the subscribed topics, so we need to "change" the subscription temporarily
         kafkaConsumer.pause(topicPartitionsToPause);
         kafkaConsumer.resume(topicPartitionsToPoll);
-        records = kafkaConsumer.poll(timeout);
-        // resume original set of subscription - may be required for checkpointing
-        kafkaConsumer.resume(topicPartitionsToPause);
+        records = kafkaConsumer.poll(timeoutMs);
       }
     } catch (Exception e) {
       // we may get InvalidOffsetException | AuthorizationException | KafkaException exceptions,
       // but we still just rethrow, and log it up the stack.
-      LOG.error("Caught a Kafka exception in pollConsumer", e);
+      LOG.error("Caught a Kafka exception in pollConsumer for system " + systemName, e);
       throw e;
     }
 
@@ -209,12 +288,11 @@ import org.slf4j.LoggerFactory;
 
   private Map<SystemStreamPartition, List<IncomingMessageEnvelope>> processResults(ConsumerRecords<K, V> records) {
     if (records == null) {
-      throw new SamzaException("ERROR:records is null, after pollConsumer call (in processResults)");
+      throw new SamzaException("Received null 'records' after polling consumer in KafkaConsumerProxy " + this);
     }
 
     Map<SystemStreamPartition, List<IncomingMessageEnvelope>> results = new HashMap<>(records.count());
     // Parse the returned records and convert them into the IncomingMessageEnvelope.
-    // Note. They have been already de-serialized by the consumer.
     for (ConsumerRecord<K, V> record : records) {
       int partition = record.partition();
       String topic = record.topic();
@@ -222,18 +300,18 @@ import org.slf4j.LoggerFactory;
 
       updateMetrics(record, tp);
 
-      SystemStreamPartition ssp = topicPartitions2SSP.get(tp);
-      List<IncomingMessageEnvelope> listMsgs = results.get(ssp);
-      if (listMsgs == null) {
-        listMsgs = new ArrayList<>();
-        results.put(ssp, listMsgs);
+      SystemStreamPartition ssp = topicPartitionToSSP.get(tp);
+      List<IncomingMessageEnvelope> messages = results.get(ssp);
+      if (messages == null) {
+        messages = new ArrayList<>();
+        results.put(ssp, messages);
       }
 
-      final K key = record.key();
-      final Object value = record.value();
-      final IncomingMessageEnvelope imEnvelope =
+      K key = record.key();
+      Object value = record.value();
+      IncomingMessageEnvelope imEnvelope =
           new IncomingMessageEnvelope(ssp, String.valueOf(record.offset()), key, value, getRecordSize(record));
-      listMsgs.add(imEnvelope);
+      messages.add(imEnvelope);
     }
     if (LOG.isDebugEnabled()) {
       LOG.debug("# records per SSP:");
@@ -246,52 +324,6 @@ import org.slf4j.LoggerFactory;
     return results;
   }
 
-   // creates a separate thread for getting the messages.
-  private Runnable createProxyThreadRunnable() {
-    Runnable runnable=  () -> {
-      isRunning = true;
-
-      try {
-        consumerPollThreadStartLatch.countDown();
-        LOG.info("Starting runnable " + consumerPollThread.getName());
-        initializeLags();
-        while (isRunning) {
-          fetchMessages();
-        }
-      } catch (Throwable throwable) {
-        LOG.error(String.format("Error in KafkaConsumerProxy poll thread for system: %s.", systemName), throwable);
-        // KafkaSystemConsumer uses the failureCause to propagate the throwable to the container
-        failureCause = throwable;
-        isRunning = false;
-      }
-
-      if (!isRunning) {
-        LOG.info("KafkaConsumerProxy for system {} has stopped.", systemName);
-      }
-    };
-
-    return runnable;
-  }
-
-  private void initializeLags() {
-    // This is expensive, so only do it once at the beginning. After the first poll, we can rely on metrics for lag.
-    Map<TopicPartition, Long> endOffsets = kafkaConsumer.endOffsets(topicPartitions2SSP.keySet());
-    endOffsets.forEach((tp, offset) -> {
-      SystemStreamPartition ssp = topicPartitions2SSP.get(tp);
-      long startingOffset = nextOffsets.get(ssp);
-      // End offsets are the offset of the newest message + 1
-      // If the message we are about to consume is < end offset, we are starting with a lag.
-      long initialLag = endOffsets.get(tp) - startingOffset;
-
-      LOG.info("Initial lag for SSP {} is {} (end={}, startOffset={})", ssp, initialLag, endOffsets.get(tp), startingOffset);
-      latestLags.put(ssp, initialLag);
-      sink.setIsAtHighWatermark(ssp, initialLag == 0);
-    });
-
-    // initialize lag metrics
-    refreshLatencyMetrics();
-  }
-
   private int getRecordSize(ConsumerRecord<K, V> r) {
     int keySize = (r.key() == null) ? 0 : r.serializedKeySize();
     return keySize + r.serializedValueSize();
@@ -300,10 +332,16 @@ import org.slf4j.LoggerFactory;
   private void updateMetrics(ConsumerRecord<K, V> r, TopicPartition tp) {
     TopicAndPartition tap = KafkaSystemConsumer.toTopicAndPartition(tp);
     SystemStreamPartition ssp = new SystemStreamPartition(systemName, tp.topic(), new Partition(tp.partition()));
-    long currentSSPLag = getLatestLag(ssp); // lag between the current offset and the highwatermark
+
+    Long lag = latestLags.get(ssp);
+    if (lag == null) {
+      throw new SamzaException("Unknown/unregistered ssp in latestLags. ssp=" + ssp + "; system=" + systemName);
+    }
+    long currentSSPLag = lag.longValue(); // lag between the current offset and the highwatermark
     if (currentSSPLag < 0) {
       return;
     }
+
     long recordOffset = r.offset();
     long highWatermark = recordOffset + currentSSPLag; // derived value for the highwatermark
 
@@ -315,7 +353,6 @@ import org.slf4j.LoggerFactory;
     kafkaConsumerMetrics.setHighWatermarkValue(tap, highWatermark);
   }
 
-
   private void moveMessagesToTheirQueue(SystemStreamPartition ssp, List<IncomingMessageEnvelope> envelopes) {
     long nextOffset = nextOffsets.get(ssp);
 
@@ -329,16 +366,6 @@ import org.slf4j.LoggerFactory;
     nextOffsets.put(ssp, nextOffset);
   }
 
-  private void populateMetricNames(Set<SystemStreamPartition> ssps) {
-    HashMap<String, String> tags = new HashMap<>();
-    tags.put("client-id", clientId);// this is required by the KafkaConsumer to get the metrics
-
-    for (SystemStreamPartition ssp : ssps) {
-      TopicPartition tp = KafkaSystemConsumer.toTopicPartition(ssp);
-      perPartitionMetrics.put(ssp, new MetricName(tp + ".records-lag", "consumer-fetch-manager-metrics", "", tags));
-    }
-  }
-
   // The only way to figure out lag for the KafkaConsumer is to look at the metrics after each poll() call.
   // One of the metrics (records-lag) shows how far behind the HighWatermark the consumer is.
   // This method populates the lag information for each SSP into latestLags member variable.
@@ -348,17 +375,23 @@ import org.slf4j.LoggerFactory;
 
     // populate the MetricNames first time
     if (perPartitionMetrics.isEmpty()) {
-      populateMetricNames(ssps);
+      HashMap<String, String> tags = new HashMap<>();
+      tags.put("client-id", clientId); // this is required by the KafkaConsumer to get the metrics
+
+      for (SystemStreamPartition ssp : ssps) {
+        TopicPartition tp = KafkaSystemConsumer.toTopicPartition(ssp);
+        perPartitionMetrics.put(ssp, new MetricName(tp + ".records-lag", "consumer-fetch-manager-metrics", "", tags));
+      }
     }
 
     for (SystemStreamPartition ssp : ssps) {
       MetricName mn = perPartitionMetrics.get(ssp);
-      Metric currentLagM = consumerMetrics.get(mn);
+      Metric currentLagMetric = consumerMetrics.get(mn);
 
       // High watermark is fixed to be the offset of last available message,
       // so the lag is now at least 0, which is the same as Samza's definition.
       // If the lag is not 0, then isAtHead is not true, and kafkaClient keeps polling.
-      long currentLag = (currentLagM != null) ? (long) currentLagM.value() : -1L;
+      long currentLag = (currentLagMetric != null) ? (long) currentLagMetric.value() : -1L;
       latestLags.put(ssp, currentLag);
 
       // calls the setIsAtHead for the BlockingEnvelopeMap
@@ -366,58 +399,7 @@ import org.slf4j.LoggerFactory;
     }
   }
 
-  // Get the latest lag for a specific SSP.
-  private long getLatestLag(SystemStreamPartition ssp) {
-    Long lag = latestLags.get(ssp);
-    if (lag == null) {
-      throw new SamzaException("Unknown/unregistered ssp in latestLags request: " + ssp);
-    }
-    return lag;
-  }
-
-  // Using the consumer to poll the messages from the stream.
-  private void fetchMessages() {
-    Set<SystemStreamPartition> sspsToFetch = new HashSet<>();
-    for (SystemStreamPartition ssp : nextOffsets.keySet()) {
-      if (sink.needsMoreMessages(ssp)) {
-        sspsToFetch.add(ssp);
-      }
-    }
-    LOG.debug("pollConsumer {}", sspsToFetch.size());
-    if (!sspsToFetch.isEmpty()) {
-      kafkaConsumerMetrics.incClientReads(metricName);
-
-      Map<SystemStreamPartition, List<IncomingMessageEnvelope>> response;
-      LOG.debug("pollConsumer from following SSPs: {}; total#={}", sspsToFetch, sspsToFetch.size());
-
-      response = pollConsumer(sspsToFetch, 500L);
-
-      // move the responses into the queue
-      for (Map.Entry<SystemStreamPartition, List<IncomingMessageEnvelope>> e : response.entrySet()) {
-        List<IncomingMessageEnvelope> envelopes = e.getValue();
-        if (envelopes != null) {
-          moveMessagesToTheirQueue(e.getKey(), envelopes);
-        }
-      }
-
-      populateCurrentLags(sspsToFetch); // find current lags for for each SSP
-    } else { // nothing to read
-
-      LOG.debug("No topic/partitions need to be fetched for consumer {} right now. Sleeping {}ms.", kafkaConsumer,
-          SLEEP_MS_WHILE_NO_TOPIC_PARTITION);
-
-      kafkaConsumerMetrics.incClientSkippedFetchRequests(metricName);
-
-      try {
-        Thread.sleep(SLEEP_MS_WHILE_NO_TOPIC_PARTITION);
-      } catch (InterruptedException e) {
-        LOG.warn("Sleep in fetchMessages was interrupted");
-      }
-    }
-    refreshLatencyMetrics();
-  }
-
-  private void refreshLatencyMetrics() {
+  private void refreshLagMetrics() {
     for (Map.Entry<SystemStreamPartition, Long> e : nextOffsets.entrySet()) {
       SystemStreamPartition ssp = e.getKey();
       Long offset = e.getValue();
@@ -433,12 +415,9 @@ import org.slf4j.LoggerFactory;
     }
   }
 
-  boolean isRunning() {
-    return isRunning;
-  }
-
-  Throwable getFailureCause() {
-    return failureCause;
+   @Override
+  public String toString() {
+    return String.format("consumerProxy-%s-%s", systemName, clientId);
   }
 }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/190a3999/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.java b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.java
index 17f29f1..10ce274 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.java
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.java
@@ -66,10 +66,10 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
   final KafkaConsumerMessageSink messageSink;
 
   // This proxy contains a separate thread, which reads kafka messages (with consumer.poll()) and populates
-  // BlockeingEnvelopMap's buffers.
+  // BlockingEnvelopMap's buffers.
   final private KafkaConsumerProxy proxy;
 
-  // keep registration data until the start - mapping between registered SSPs and topicPartitions, and the offsets
+  // keep registration data until the start - mapping between registered SSPs and topicPartitions, and their offsets
   final Map<TopicPartition, String> topicPartitionsToOffset = new HashMap<>();
   final Map<TopicPartition, SystemStreamPartition> topicPartitionsToSSP = new HashMap<>();
 
@@ -77,10 +77,11 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
   long perPartitionFetchThresholdBytes;
 
   /**
+   * Create a KafkaSystemConsumer for the provided {@code systemName}
    * @param systemName system name for which we create the consumer
-   * @param config config passed into the the app
-   * @param metrics metrics collecting object
-   * @param clock - system clock, allows to override internal clock (System.currentTimeMillis())
+   * @param config application config
+   * @param metrics metrics for this KafkaSystemConsumer
+   * @param clock system clock
    */
   public KafkaSystemConsumer(Consumer<K, V> kafkaConsumer, String systemName, Config config, String clientId,
       KafkaSystemConsumerMetrics metrics, Clock clock) {
@@ -99,11 +100,9 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
     messageSink = new KafkaConsumerMessageSink();
 
     // Create the proxy to do the actual message reading.
-    String metricName = String.format("%s %s", systemName, clientId);
+    String metricName = String.format("%s", systemName);
     proxy = new KafkaConsumerProxy(kafkaConsumer, systemName, clientId, messageSink, metrics, metricName);
     LOG.info("{}: Created KafkaConsumerProxy {} ", this, proxy);
-
-    LOG.info("{}: Created KafkaSystemConsumer {}", this, kafkaConsumer);
   }
 
   /**
@@ -118,7 +117,7 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
     // extract kafka client configs
     KafkaConsumerConfig consumerConfig = KafkaConsumerConfig.getKafkaSystemConsumerConfig(config, systemName, clientId);
 
-    LOG.info("{}:{} KafkaClient properties {}", systemName, clientId, consumerConfig);
+    LOG.info("{}: KafkaClient properties {}", systemName, consumerConfig);
 
     return new KafkaConsumer(consumerConfig);
   }
@@ -130,7 +129,7 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
       return;
     }
     if (stopped.get()) {
-      LOG.warn("{}: Attempting to start a stopped consumer", this);
+      LOG.error("{}: Attempting to start a stopped consumer", this);
       return;
     }
     // initialize the subscriptions for all the registered TopicPartitions
@@ -151,8 +150,7 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
         kafkaConsumer.assign(topicPartitionsToSSP.keySet());
       }
     } catch (Exception e) {
-      LOG.warn("{}: Start subscription failed", this);
-      throw new SamzaException(e);
+      throw new SamzaException("Consumer subscription failed for " + this, e);
     }
   }
 
@@ -164,7 +162,7 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
   void startConsumer() {
     // set the offset for each TopicPartition
     if (topicPartitionsToOffset.size() <= 0) {
-      LOG.warn("{}: Consumer is not subscribed to any SSPs", this);
+      LOG.error ("{}: Consumer is not subscribed to any SSPs", this);
     }
 
     topicPartitionsToOffset.forEach((tp, startingOffsetString) -> {
@@ -204,35 +202,30 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
     long fetchThreshold = FETCH_THRESHOLD;
     if (fetchThresholdOption.isDefined()) {
       fetchThreshold = Long.valueOf(fetchThresholdOption.get());
-      LOG.info("{}: fetchThresholdOption is configured. fetchThreshold={}", this, fetchThreshold);
     }
 
     Option<String> fetchThresholdBytesOption = kafkaConfig.getConsumerFetchThresholdBytes(systemName);
     long fetchThresholdBytes = FETCH_THRESHOLD_BYTES;
     if (fetchThresholdBytesOption.isDefined()) {
       fetchThresholdBytes = Long.valueOf(fetchThresholdBytesOption.get());
-      LOG.info("{}: fetchThresholdBytesOption is configured. fetchThresholdBytes={}", this, fetchThresholdBytes);
     }
 
-    int numTPs = topicPartitionsToSSP.size();
-    if (numTPs != topicPartitionsToOffset.size()) {
+    int numPartitions = topicPartitionsToSSP.size();
+    if (numPartitions != topicPartitionsToOffset.size()) {
       throw new SamzaException("topicPartitionsToSSP.size() doesn't match topicPartitionsToOffset.size()");
     }
 
-    LOG.info("{}: fetchThresholdBytes = {}; fetchThreshold={}; partitions num={}", this, fetchThresholdBytes,
-        fetchThreshold, numTPs);
 
-    if (numTPs > 0) {
-      perPartitionFetchThreshold = fetchThreshold / numTPs;
-      LOG.info("{}: perPartitionFetchThreshold={}", this, perPartitionFetchThreshold);
+    if (numPartitions > 0) {
+      perPartitionFetchThreshold = fetchThreshold / numPartitions;
       if (fetchThresholdBytesEnabled) {
         // currently this feature cannot be enabled, because we do not have the size of the messages available.
         // messages get double buffered, hence divide by 2
-        perPartitionFetchThresholdBytes = (fetchThresholdBytes / 2) / numTPs;
-        LOG.info("{} :perPartitionFetchThresholdBytes is enabled. perPartitionFetchThresholdBytes={}", this,
-            perPartitionFetchThresholdBytes);
+        perPartitionFetchThresholdBytes = (fetchThresholdBytes / 2) / numPartitions;
       }
     }
+    LOG.info("{}: fetchThresholdBytes = {}; fetchThreshold={}; numPartitions={}, perPartitionFetchThreshold={}, perPartitionFetchThresholdBytes(0 if disabled)={}",
+        this, fetchThresholdBytes, fetchThreshold, numPartitions, perPartitionFetchThreshold, perPartitionFetchThresholdBytes);
   }
 
   @Override
@@ -260,8 +253,10 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
     }
   }
 
-  /*
-   record the ssp and the offset. Do not submit it to the consumer yet.
+  /**
+   * record the ssp and the offset. Do not submit it to the consumer yet.
+   * @param systemStreamPartition ssp to register
+   * @param offset offset to register with
    */
   @Override
   public void register(SystemStreamPartition systemStreamPartition, String offset) {

http://git-wip-us.apache.org/repos/asf/samza/blob/190a3999/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala
index c4552e6..59a8854 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala
@@ -55,7 +55,7 @@ class KafkaSystemConsumerMetrics(val systemName: String = "unknown", val registr
 
   // java friendlier interfaces
   // Gauges
-  def setTopicPartitionValue(clientName: String, value: Int) {
+  def setNumTopicPartitions(clientName: String, value: Int) {
     topicPartitions.get(clientName).set(value)
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/190a3999/samza-kafka/src/test/java/org/apache/samza/config/TestKafkaConsumerConfig.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/java/org/apache/samza/config/TestKafkaConsumerConfig.java b/samza-kafka/src/test/java/org/apache/samza/config/TestKafkaConsumerConfig.java
index 35a717a..de5d093 100644
--- a/samza-kafka/src/test/java/org/apache/samza/config/TestKafkaConsumerConfig.java
+++ b/samza-kafka/src/test/java/org/apache/samza/config/TestKafkaConsumerConfig.java
@@ -28,7 +28,7 @@ import org.junit.Test;
 
 
 public class TestKafkaConsumerConfig {
-  private final Map<String, String> props = new HashMap<>();
+
   public final static String SYSTEM_NAME = "testSystem";
   public final static String KAFKA_PRODUCER_PROPERTY_PREFIX = "systems." + SYSTEM_NAME + ".producer.";
   public final static String KAFKA_CONSUMER_PROPERTY_PREFIX = "systems." + SYSTEM_NAME + ".consumer.";
@@ -36,6 +36,7 @@ public class TestKafkaConsumerConfig {
 
   @Test
   public void testDefaults() {
+    Map<String, String> props = new HashMap<>();
 
     props.put(KAFKA_CONSUMER_PROPERTY_PREFIX + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); // should be ignored
     props.put(KAFKA_CONSUMER_PROPERTY_PREFIX + ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
@@ -43,6 +44,8 @@ public class TestKafkaConsumerConfig {
     props.put(KAFKA_CONSUMER_PROPERTY_PREFIX + ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,
         "100"); // should NOT be ignored
 
+    props.put(JobConfig.JOB_NAME(), "jobName");
+
     // if KAFKA_CONSUMER_PROPERTY_PREFIX is set, then PRODUCER should be ignored
     props.put(KAFKA_PRODUCER_PROPERTY_PREFIX + "bootstrap.servers", "ignroeThis:9092");
     props.put(KAFKA_CONSUMER_PROPERTY_PREFIX + "bootstrap.servers", "useThis:9092");
@@ -72,11 +75,23 @@ public class TestKafkaConsumerConfig {
 
     Assert.assertEquals(KafkaConsumerConfig.getConsumerGroupId(config),
         kafkaConsumerConfig.get(ConsumerConfig.GROUP_ID_CONFIG));
+
+    Assert.assertEquals(KafkaConsumerConfig.CONSUMER_CLIENT_ID_PREFIX.replace("-", "_") + "-jobName-1",
+        KafkaConsumerConfig.getConsumerClientId(config));
+    Assert.assertEquals("jobName-1", KafkaConsumerConfig.getConsumerGroupId(config));
+
+    props.put(JobConfig.JOB_ID(), "jobId");
+    config = new MapConfig(props);
+
+    Assert.assertEquals(KafkaConsumerConfig.CONSUMER_CLIENT_ID_PREFIX.replace("-", "_") + "-jobName-jobId",
+        KafkaConsumerConfig.getConsumerClientId(config));
+    Assert.assertEquals("jobName-jobId", KafkaConsumerConfig.getConsumerGroupId(config));
   }
 
   // test stuff that should not be overridden
   @Test
   public void testNotOverride() {
+    Map<String, String> props = new HashMap<>();
 
     // if KAFKA_CONSUMER_PROPERTY_PREFIX is not set, then PRODUCER should be used
     props.put(KAFKA_PRODUCER_PROPERTY_PREFIX + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "useThis:9092");
@@ -85,6 +100,8 @@ public class TestKafkaConsumerConfig {
     props.put(KAFKA_CONSUMER_PROPERTY_PREFIX + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
         TestKafkaConsumerConfig.class.getName());
 
+    props.put(JobConfig.JOB_NAME(), "jobName");
+
     Config config = new MapConfig(props);
     KafkaConsumerConfig kafkaConsumerConfig =
         KafkaConsumerConfig.getKafkaSystemConsumerConfig(config, SYSTEM_NAME, CLIENT_ID);