You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by bo...@apache.org on 2018/09/26 00:23:23 UTC

[24/29] samza git commit: more review comments

more review comments


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/4ae563c6
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/4ae563c6
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/4ae563c6

Branch: refs/heads/NewKafkaSystemConsumer
Commit: 4ae563c609aaaf0d2ceaeb90ab9ae33dfdc8d601
Parents: 5397a34
Author: Boris S <bo...@apache.org>
Authored: Mon Sep 24 12:31:25 2018 -0700
Committer: Boris S <bo...@apache.org>
Committed: Mon Sep 24 12:31:25 2018 -0700

----------------------------------------------------------------------
 .../samza/config/KafkaConsumerConfig.java       | 19 ++---
 .../samza/system/kafka/KafkaSystemConsumer.java | 40 +++++----
 .../samza/system/kafka/KafkaSystemFactory.scala |  1 -
 .../samza/config/TestKafkaConsumerConfig.java   | 85 ++++++++------------
 .../system/kafka/TestKafkaSystemAdminJava.java  | 18 ++---
 .../system/kafka/TestKafkaSystemConsumer.java   | 40 ++++-----
 6 files changed, 84 insertions(+), 119 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/4ae563c6/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConsumerConfig.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConsumerConfig.java b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConsumerConfig.java
index 4bbe00f..7d2408b 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConsumerConfig.java
+++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConsumerConfig.java
@@ -23,13 +23,11 @@ package org.apache.samza.config;
 
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Properties;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.RangeAssignor;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.samza.SamzaException;
-import org.apache.samza.config.JobConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.Option;
@@ -78,7 +76,6 @@ public class KafkaConsumerConfig extends HashMap<String, Object> {
     consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
     consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
 
-
     // These are values we enforce in sazma, and they cannot be overwritten.
 
     // Disable consumer auto-commit because Samza controls commits
@@ -86,7 +83,7 @@ public class KafkaConsumerConfig extends HashMap<String, Object> {
 
     // Translate samza config value to kafka config value
     consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
-        getAutoOffsetResetValue((String)consumerProps.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)));
+        getAutoOffsetResetValue((String) consumerProps.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)));
 
     // make sure bootstrap configs are in, if not - get them from the producer
     if (!subConf.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)) {
@@ -148,9 +145,7 @@ public class KafkaConsumerConfig extends HashMap<String, Object> {
     String jobName = config.get(JobConfig.JOB_NAME());
     String jobId = (config.get(JobConfig.JOB_ID()) != null) ? config.get(JobConfig.JOB_ID()) : "1";
 
-    return String.format("%s-%s-%s", id.replaceAll(
-        "\\W", "_"),
-        jobName.replaceAll("\\W", "_"),
+    return String.format("%s-%s-%s", id.replaceAll("\\W", "_"), jobName.replaceAll("\\W", "_"),
         jobId.replaceAll("\\W", "_"));
   }
 
@@ -172,7 +167,7 @@ public class KafkaConsumerConfig extends HashMap<String, Object> {
     final String KAFKA_OFFSET_NONE = "none";
 
     if (autoOffsetReset == null) {
-     return KAFKA_OFFSET_LATEST; // return default
+      return KAFKA_OFFSET_LATEST; // return default
     }
 
     // accept kafka values directly
@@ -184,15 +179,15 @@ public class KafkaConsumerConfig extends HashMap<String, Object> {
     String newAutoOffsetReset;
     switch (autoOffsetReset) {
       case SAMZA_OFFSET_LARGEST:
-        newAutoOffsetReset =  KAFKA_OFFSET_LATEST;
+        newAutoOffsetReset = KAFKA_OFFSET_LATEST;
         break;
       case SAMZA_OFFSET_SMALLEST:
-        newAutoOffsetReset =  KAFKA_OFFSET_EARLIEST;
+        newAutoOffsetReset = KAFKA_OFFSET_EARLIEST;
         break;
       default:
-        newAutoOffsetReset =  KAFKA_OFFSET_LATEST;
+        newAutoOffsetReset = KAFKA_OFFSET_LATEST;
     }
-    LOG.info("AutoOffsetReset value converted from {} to {}", autoOffsetReset,  newAutoOffsetReset);
+    LOG.info("AutoOffsetReset value converted from {} to {}", autoOffsetReset, newAutoOffsetReset);
     return newAutoOffsetReset;
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/4ae563c6/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.java b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.java
index e5ded8d..17f29f1 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.java
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.java
@@ -31,11 +31,11 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import kafka.common.TopicAndPartition;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.samza.config.KafkaConsumerConfig;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.KafkaConfig;
+import org.apache.samza.config.KafkaConsumerConfig;
 import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.system.SystemConsumer;
 import org.apache.samza.system.SystemStreamPartition;
@@ -69,7 +69,6 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
   // BlockeingEnvelopMap's buffers.
   final private KafkaConsumerProxy proxy;
 
-
   // keep registration data until the start - mapping between registered SSPs and topicPartitions, and the offsets
   final Map<TopicPartition, String> topicPartitionsToOffset = new HashMap<>();
   final Map<TopicPartition, SystemStreamPartition> topicPartitionsToSSP = new HashMap<>();
@@ -102,7 +101,7 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
     // Create the proxy to do the actual message reading.
     String metricName = String.format("%s %s", systemName, clientId);
     proxy = new KafkaConsumerProxy(kafkaConsumer, systemName, clientId, messageSink, metrics, metricName);
-    LOG.info("{}: Created KafkaConsumerProxy {} ", this, proxy );
+    LOG.info("{}: Created KafkaConsumerProxy {} ", this, proxy);
 
     LOG.info("{}: Created KafkaSystemConsumer {}", this, kafkaConsumer);
   }
@@ -117,8 +116,7 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
   public static KafkaConsumer<byte[], byte[]> getKafkaConsumerImpl(String systemName, String clientId, Config config) {
 
     // extract kafka client configs
-    KafkaConsumerConfig consumerConfig =
-        KafkaConsumerConfig.getKafkaSystemConsumerConfig(config, systemName, clientId);
+    KafkaConsumerConfig consumerConfig = KafkaConsumerConfig.getKafkaSystemConsumerConfig(config, systemName, clientId);
 
     LOG.info("{}:{} KafkaClient properties {}", systemName, clientId, consumerConfig);
 
@@ -179,8 +177,8 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
       } catch (Exception e) {
         // all recoverable execptions are handled by the client.
         // if we get here there is nothing left to do but bail out.
-        String msg = String.format("%s: Got Exception while seeking to %s for partition %s",
-            this, startingOffsetString, tp);
+        String msg =
+            String.format("%s: Got Exception while seeking to %s for partition %s", this, startingOffsetString, tp);
         LOG.error(msg, e);
         throw new SamzaException(msg, e);
       }
@@ -217,12 +215,12 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
     }
 
     int numTPs = topicPartitionsToSSP.size();
-    if (numTPs == topicPartitionsToOffset.size()) {
+    if (numTPs != topicPartitionsToOffset.size()) {
       throw new SamzaException("topicPartitionsToSSP.size() doesn't match topicPartitionsToOffset.size()");
     }
 
-    LOG.info("{}: fetchThresholdBytes = {}; fetchThreshold={}; partitions num={}",
-        this, fetchThresholdBytes, fetchThreshold, numTPs);
+    LOG.info("{}: fetchThresholdBytes = {}; fetchThreshold={}; partitions num={}", this, fetchThresholdBytes,
+        fetchThreshold, numTPs);
 
     if (numTPs > 0) {
       perPartitionFetchThreshold = fetchThreshold / numTPs;
@@ -231,8 +229,8 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
         // currently this feature cannot be enabled, because we do not have the size of the messages available.
         // messages get double buffered, hence divide by 2
         perPartitionFetchThresholdBytes = (fetchThresholdBytes / 2) / numTPs;
-        LOG.info("{} :perPartitionFetchThresholdBytes is enabled. perPartitionFetchThresholdBytes={}",
-            this, perPartitionFetchThresholdBytes);
+        LOG.info("{} :perPartitionFetchThresholdBytes is enabled. perPartitionFetchThresholdBytes={}", this,
+            perPartitionFetchThresholdBytes);
       }
     }
   }
@@ -268,9 +266,8 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
   @Override
   public void register(SystemStreamPartition systemStreamPartition, String offset) {
     if (started.get()) {
-      String msg =
-          String.format("%s: Trying to register partition after consumer has been started. ssp=%s",
-              this, systemStreamPartition);
+      String msg = String.format("%s: Trying to register partition after consumer has been started. ssp=%s", this,
+          systemStreamPartition);
       throw new SamzaException(msg);
     }
 
@@ -286,7 +283,6 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
 
     topicPartitionsToSSP.put(tp, systemStreamPartition);
 
-
     String existingOffset = topicPartitionsToOffset.get(tp);
     // register the older (of the two) offset in the consumer, to guarantee we do not miss any messages.
     if (existingOffset == null || compareOffsets(existingOffset, offset) > 0) {
@@ -353,10 +349,10 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
     }
 
     boolean needsMoreMessages(SystemStreamPartition ssp) {
-        LOG.debug("{}: needsMoreMessages from following SSP: {}. fetchLimitByBytes enabled={}; messagesSizeInQueue={};"
-                + "(limit={}); messagesNumInQueue={}(limit={};", this, ssp, fetchThresholdBytesEnabled,
-            getMessagesSizeInQueue(ssp), perPartitionFetchThresholdBytes, getNumMessagesInQueue(ssp),
-            perPartitionFetchThreshold);
+      LOG.debug("{}: needsMoreMessages from following SSP: {}. fetchLimitByBytes enabled={}; messagesSizeInQueue={};"
+              + "(limit={}); messagesNumInQueue={}(limit={};", this, ssp, fetchThresholdBytesEnabled,
+          getMessagesSizeInQueue(ssp), perPartitionFetchThresholdBytes, getNumMessagesInQueue(ssp),
+          perPartitionFetchThreshold);
 
       if (fetchThresholdBytesEnabled) {
         return getMessagesSizeInQueue(ssp) < perPartitionFetchThresholdBytes;
@@ -372,8 +368,8 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
         put(ssp, envelope);
       } catch (InterruptedException e) {
         throw new SamzaException(
-            String.format("%s: Consumer was interrupted while trying to add message with offset %s for ssp %s",
-                this, envelope.getOffset(), ssp));
+            String.format("%s: Consumer was interrupted while trying to add message with offset %s for ssp %s", this,
+                envelope.getOffset(), ssp));
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/4ae563c6/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
index deaee56..ba5390b 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
@@ -22,7 +22,6 @@ package org.apache.samza.system.kafka
 import java.util.Properties
 
 import kafka.utils.ZkUtils
-import org.apache.kafka.clients.consumer.KafkaConsumer
 import org.apache.kafka.clients.producer.KafkaProducer
 import org.apache.samza.SamzaException
 import org.apache.samza.config.ApplicationConfig.ApplicationMode

http://git-wip-us.apache.org/repos/asf/samza/blob/4ae563c6/samza-kafka/src/test/java/org/apache/samza/config/TestKafkaConsumerConfig.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/java/org/apache/samza/config/TestKafkaConsumerConfig.java b/samza-kafka/src/test/java/org/apache/samza/config/TestKafkaConsumerConfig.java
index 719ea22..35a717a 100644
--- a/samza-kafka/src/test/java/org/apache/samza/config/TestKafkaConsumerConfig.java
+++ b/samza-kafka/src/test/java/org/apache/samza/config/TestKafkaConsumerConfig.java
@@ -24,7 +24,6 @@ import org.apache.kafka.clients.consumer.RangeAssignor;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.samza.SamzaException;
 import org.junit.Assert;
-import org.junit.Before;
 import org.junit.Test;
 
 
@@ -35,84 +34,67 @@ public class TestKafkaConsumerConfig {
   public final static String KAFKA_CONSUMER_PROPERTY_PREFIX = "systems." + SYSTEM_NAME + ".consumer.";
   private final static String CLIENT_ID = "clientId";
 
-  @Before
-  public void setProps() {
-
-  }
-
   @Test
   public void testDefaults() {
 
     props.put(KAFKA_CONSUMER_PROPERTY_PREFIX + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); // should be ignored
-    props.put(KAFKA_CONSUMER_PROPERTY_PREFIX + ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "Ignore"); // should be ignored
-    props.put(KAFKA_CONSUMER_PROPERTY_PREFIX + ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100"); // should NOT be ignored
+    props.put(KAFKA_CONSUMER_PROPERTY_PREFIX + ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
+        "Ignore"); // should be ignored
+    props.put(KAFKA_CONSUMER_PROPERTY_PREFIX + ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,
+        "100"); // should NOT be ignored
 
     // if KAFKA_CONSUMER_PROPERTY_PREFIX is set, then PRODUCER should be ignored
     props.put(KAFKA_PRODUCER_PROPERTY_PREFIX + "bootstrap.servers", "ignroeThis:9092");
     props.put(KAFKA_CONSUMER_PROPERTY_PREFIX + "bootstrap.servers", "useThis:9092");
 
     Config config = new MapConfig(props);
-    KafkaConsumerConfig kafkaConsumerConfig = KafkaConsumerConfig.getKafkaSystemConsumerConfig(
-        config, SYSTEM_NAME, CLIENT_ID);
+    KafkaConsumerConfig kafkaConsumerConfig =
+        KafkaConsumerConfig.getKafkaSystemConsumerConfig(config, SYSTEM_NAME, CLIENT_ID);
 
     Assert.assertEquals("false", kafkaConsumerConfig.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG));
 
-    Assert.assertEquals(
-        KafkaConsumerConfig.DEFAULT_KAFKA_CONSUMER_MAX_POLL_RECORDS,
+    Assert.assertEquals(KafkaConsumerConfig.DEFAULT_KAFKA_CONSUMER_MAX_POLL_RECORDS,
         kafkaConsumerConfig.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG));
 
-    Assert.assertEquals(
-        RangeAssignor.class.getName(),
+    Assert.assertEquals(RangeAssignor.class.getName(),
         kafkaConsumerConfig.get(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG));
 
-    Assert.assertEquals(
-        "useThis:9092",
-        kafkaConsumerConfig.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
-    Assert.assertEquals(
-        "100",
-        kafkaConsumerConfig.get(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG));
+    Assert.assertEquals("useThis:9092", kafkaConsumerConfig.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
+    Assert.assertEquals("100", kafkaConsumerConfig.get(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG));
 
-    Assert.assertEquals(
-        ByteArrayDeserializer.class.getName(),
+    Assert.assertEquals(ByteArrayDeserializer.class.getName(),
         kafkaConsumerConfig.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG));
 
-    Assert.assertEquals(
-        ByteArrayDeserializer.class.getName(),
-        kafkaConsumerConfig.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG) );
+    Assert.assertEquals(ByteArrayDeserializer.class.getName(),
+        kafkaConsumerConfig.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG));
 
-    Assert.assertEquals(
-        CLIENT_ID,
-        kafkaConsumerConfig.get(ConsumerConfig.CLIENT_ID_CONFIG));
+    Assert.assertEquals(CLIENT_ID, kafkaConsumerConfig.get(ConsumerConfig.CLIENT_ID_CONFIG));
 
-    Assert.assertEquals(
-        KafkaConsumerConfig.getConsumerGroupId(config),
+    Assert.assertEquals(KafkaConsumerConfig.getConsumerGroupId(config),
         kafkaConsumerConfig.get(ConsumerConfig.GROUP_ID_CONFIG));
   }
 
-  @Test
   // test stuff that should not be overridden
+  @Test
   public void testNotOverride() {
 
     // if KAFKA_CONSUMER_PROPERTY_PREFIX is not set, then PRODUCER should be used
     props.put(KAFKA_PRODUCER_PROPERTY_PREFIX + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "useThis:9092");
-    props.put(KAFKA_CONSUMER_PROPERTY_PREFIX + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, TestKafkaConsumerConfig.class.getName());
-    props.put(KAFKA_CONSUMER_PROPERTY_PREFIX + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, TestKafkaConsumerConfig.class.getName());
-
+    props.put(KAFKA_CONSUMER_PROPERTY_PREFIX + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+        TestKafkaConsumerConfig.class.getName());
+    props.put(KAFKA_CONSUMER_PROPERTY_PREFIX + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+        TestKafkaConsumerConfig.class.getName());
 
     Config config = new MapConfig(props);
-    KafkaConsumerConfig kafkaConsumerConfig = KafkaConsumerConfig.getKafkaSystemConsumerConfig(
-        config, SYSTEM_NAME, CLIENT_ID);
+    KafkaConsumerConfig kafkaConsumerConfig =
+        KafkaConsumerConfig.getKafkaSystemConsumerConfig(config, SYSTEM_NAME, CLIENT_ID);
 
-    Assert.assertEquals(
-        "useThis:9092",
-        kafkaConsumerConfig.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
+    Assert.assertEquals("useThis:9092", kafkaConsumerConfig.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
 
-    Assert.assertEquals(
-        TestKafkaConsumerConfig.class.getName(),
+    Assert.assertEquals(TestKafkaConsumerConfig.class.getName(),
         kafkaConsumerConfig.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG));
 
-    Assert.assertEquals(
-        TestKafkaConsumerConfig.class.getName(),
+    Assert.assertEquals(TestKafkaConsumerConfig.class.getName(),
         kafkaConsumerConfig.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG));
   }
 
@@ -122,30 +104,29 @@ public class TestKafkaConsumerConfig {
 
     map.put(JobConfig.JOB_NAME(), "jobName");
     map.put(JobConfig.JOB_ID(), "jobId");
-    String result =  KafkaConsumerConfig.getConsumerClientId("consumer", new MapConfig(map));
+    String result = KafkaConsumerConfig.getConsumerClientId("consumer", new MapConfig(map));
     Assert.assertEquals("consumer-jobName-jobId", result);
 
-    result =  KafkaConsumerConfig.getConsumerClientId("consumer-", new MapConfig(map));
+    result = KafkaConsumerConfig.getConsumerClientId("consumer-", new MapConfig(map));
     Assert.assertEquals("consumer_-jobName-jobId", result);
 
-    result =  KafkaConsumerConfig.getConsumerClientId("super-duper-consumer", new MapConfig(map));
+    result = KafkaConsumerConfig.getConsumerClientId("super-duper-consumer", new MapConfig(map));
     Assert.assertEquals("super_duper_consumer-jobName-jobId", result);
 
     map.put(JobConfig.JOB_NAME(), " very important!job");
-    result =  KafkaConsumerConfig.getConsumerClientId("consumer", new MapConfig(map));
+    result = KafkaConsumerConfig.getConsumerClientId("consumer", new MapConfig(map));
     Assert.assertEquals("consumer-_very_important_job-jobId", result);
 
     map.put(JobConfig.JOB_ID(), "number-#3");
-    result =  KafkaConsumerConfig.getConsumerClientId("consumer", new MapConfig(map));
+    result = KafkaConsumerConfig.getConsumerClientId("consumer", new MapConfig(map));
     Assert.assertEquals("consumer-_very_important_job-number__3", result);
   }
 
-
-
   @Test(expected = SamzaException.class)
   public void testNoBootstrapServers() {
-    KafkaConsumerConfig kafkaConsumerConfig = KafkaConsumerConfig.getKafkaSystemConsumerConfig(
-        new MapConfig(Collections.emptyMap()), SYSTEM_NAME, "clientId");
+    KafkaConsumerConfig kafkaConsumerConfig =
+        KafkaConsumerConfig.getKafkaSystemConsumerConfig(new MapConfig(Collections.emptyMap()), SYSTEM_NAME,
+            "clientId");
 
     Assert.fail("didn't get exception for the missing config:" + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG);
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/4ae563c6/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
index 77f47f9..7e968bf 100644
--- a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
+++ b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
@@ -19,17 +19,14 @@
 
 package org.apache.samza.system.kafka;
 
-import java.util.*;
 import java.util.HashMap;
 import java.util.Map;
-
+import java.util.Properties;
 import kafka.api.TopicMetadata;
 import org.apache.samza.system.StreamSpec;
 import org.apache.samza.system.StreamValidationException;
 import org.apache.samza.system.SystemAdmin;
 import org.apache.samza.util.ScalaJavaUtil;
-import org.junit.After;
-import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
 
@@ -71,7 +68,6 @@ public class TestKafkaSystemAdminJava extends TestKafkaSystemAdmin {
 
     admin.createStream(spec);
     admin.validateStream(spec);
-
   }
 
   @Test
@@ -143,7 +139,8 @@ public class TestKafkaSystemAdminJava extends TestKafkaSystemAdmin {
   public void testCreateStream() {
     StreamSpec spec = new StreamSpec("testId", "testStream", "testSystem", 8);
 
-    assertTrue("createStream should return true if the stream does not exist and then is created.", systemAdmin().createStream(spec));
+    assertTrue("createStream should return true if the stream does not exist and then is created.",
+        systemAdmin().createStream(spec));
     systemAdmin().validateStream(spec);
 
     assertFalse("createStream should return false if the stream already exists.", systemAdmin().createStream(spec));
@@ -162,7 +159,8 @@ public class TestKafkaSystemAdminJava extends TestKafkaSystemAdmin {
     StreamSpec spec1 = new StreamSpec("testId", "testStreamPartition", "testSystem", 8);
     StreamSpec spec2 = new StreamSpec("testId", "testStreamPartition", "testSystem", 4);
 
-    assertTrue("createStream should return true if the stream does not exist and then is created.", systemAdmin().createStream(spec1));
+    assertTrue("createStream should return true if the stream does not exist and then is created.",
+        systemAdmin().createStream(spec1));
 
     systemAdmin().validateStream(spec2);
   }
@@ -172,7 +170,8 @@ public class TestKafkaSystemAdminJava extends TestKafkaSystemAdmin {
     StreamSpec spec1 = new StreamSpec("testId", "testStreamName1", "testSystem", 8);
     StreamSpec spec2 = new StreamSpec("testId", "testStreamName2", "testSystem", 8);
 
-    assertTrue("createStream should return true if the stream does not exist and then is created.", systemAdmin().createStream(spec1));
+    assertTrue("createStream should return true if the stream does not exist and then is created.",
+        systemAdmin().createStream(spec1));
 
     systemAdmin().validateStream(spec2);
   }
@@ -181,7 +180,8 @@ public class TestKafkaSystemAdminJava extends TestKafkaSystemAdmin {
   public void testClearStream() {
     StreamSpec spec = new StreamSpec("testId", "testStreamClear", "testSystem", 8);
 
-    assertTrue("createStream should return true if the stream does not exist and then is created.", systemAdmin().createStream(spec));
+    assertTrue("createStream should return true if the stream does not exist and then is created.",
+        systemAdmin().createStream(spec));
     assertTrue(systemAdmin().clearStream(spec));
 
     scala.collection.immutable.Set<String> topic = new scala.collection.immutable.Set.Set1<>(spec.getPhysicalName());

http://git-wip-us.apache.org/repos/asf/samza/blob/4ae563c6/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java
index 9e8ff44..933558c 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java
+++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java
@@ -21,24 +21,22 @@
 
 package org.apache.samza.system.kafka;
 
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.samza.config.KafkaConsumerConfig;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.samza.Partition;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.KafkaConfig;
+import org.apache.samza.config.KafkaConsumerConfig;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.util.Clock;
 import org.apache.samza.util.NoOpMetricsRegistry;
 import org.junit.Assert;
-import org.junit.Before;
 import org.junit.Test;
 
 import static org.junit.Assert.*;
@@ -52,12 +50,7 @@ public class TestKafkaSystemConsumer {
   public final String FETCH_THRESHOLD_MSGS = "50000";
   public final String FETCH_THRESHOLD_BYTES = "100000";
 
-  @Before
-  public void setUp() {
-
-  }
-
-  private KafkaSystemConsumer setupConsumer(String fetchMsg, String fetchBytes) {
+  private KafkaSystemConsumer createConsumer(String fetchMsg, String fetchBytes) {
     final Map<String, String> map = new HashMap<>();
 
     map.put(String.format(KafkaConfig.CONSUMER_FETCH_THRESHOLD(), TEST_SYSTEM), fetchMsg);
@@ -70,8 +63,8 @@ public class TestKafkaSystemConsumer {
         KafkaConsumerConfig.getKafkaSystemConsumerConfig(config, TEST_SYSTEM, TEST_CLIENT_ID);
     final KafkaConsumer<byte[], byte[]> kafkaConsumer = new MockKafkaConsumer(consumerConfig);
 
-    MockKafkaSystmeCosumer newKafkaSystemConsumer =
-        new MockKafkaSystmeCosumer(kafkaConsumer, TEST_SYSTEM, config, TEST_CLIENT_ID,
+    MockKafkaSystemConsumer newKafkaSystemConsumer =
+        new MockKafkaSystemConsumer(kafkaConsumer, TEST_SYSTEM, config, TEST_CLIENT_ID,
             new KafkaSystemConsumerMetrics(TEST_SYSTEM, new NoOpMetricsRegistry()), System::currentTimeMillis);
 
     return newKafkaSystemConsumer;
@@ -80,7 +73,7 @@ public class TestKafkaSystemConsumer {
   @Test
   public void testConfigValidations() {
 
-    final KafkaSystemConsumer consumer = setupConsumer(FETCH_THRESHOLD_MSGS, FETCH_THRESHOLD_BYTES);
+    final KafkaSystemConsumer consumer = createConsumer(FETCH_THRESHOLD_MSGS, FETCH_THRESHOLD_BYTES);
 
     consumer.start();
     // should be no failures
@@ -88,7 +81,7 @@ public class TestKafkaSystemConsumer {
 
   @Test
   public void testFetchThresholdShouldDivideEvenlyAmongPartitions() {
-    final KafkaSystemConsumer consumer = setupConsumer(FETCH_THRESHOLD_MSGS, FETCH_THRESHOLD_BYTES);
+    final KafkaSystemConsumer consumer = createConsumer(FETCH_THRESHOLD_MSGS, FETCH_THRESHOLD_BYTES);
     final int partitionsNum = 50;
     for (int i = 0; i < partitionsNum; i++) {
       consumer.register(new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(i)), "0");
@@ -99,12 +92,14 @@ public class TestKafkaSystemConsumer {
     Assert.assertEquals(Long.valueOf(FETCH_THRESHOLD_MSGS) / partitionsNum, consumer.perPartitionFetchThreshold);
     Assert.assertEquals(Long.valueOf(FETCH_THRESHOLD_BYTES) / 2 / partitionsNum,
         consumer.perPartitionFetchThresholdBytes);
+
+    consumer.stop();
   }
 
   @Test
   public void testConsumerRegisterOlderOffsetOfTheSamzaSSP() {
 
-    KafkaSystemConsumer consumer = setupConsumer(FETCH_THRESHOLD_MSGS, FETCH_THRESHOLD_BYTES);
+    KafkaSystemConsumer consumer = createConsumer(FETCH_THRESHOLD_MSGS, FETCH_THRESHOLD_BYTES);
 
     SystemStreamPartition ssp0 = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(0));
     SystemStreamPartition ssp1 = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(1));
@@ -137,7 +132,7 @@ public class TestKafkaSystemConsumer {
         bytesSerde.serialize("", "value1".getBytes()), ime1Size);
     IncomingMessageEnvelope ime11 = new IncomingMessageEnvelope(ssp1, "0", bytesSerde.serialize("", "key11".getBytes()),
         bytesSerde.serialize("", "value11".getBytes()), ime11Size);
-    KafkaSystemConsumer consumer = setupConsumer(FETCH_THRESHOLD_MSGS, FETCH_THRESHOLD_BYTES);
+    KafkaSystemConsumer consumer = createConsumer(FETCH_THRESHOLD_MSGS, FETCH_THRESHOLD_BYTES);
 
     consumer.register(ssp0, "0");
     consumer.register(ssp1, "0");
@@ -156,6 +151,8 @@ public class TestKafkaSystemConsumer {
     Assert.assertEquals(2, consumer.getNumMessagesInQueue(ssp1));
     Assert.assertEquals(ime0Size, consumer.getMessagesSizeInQueue(ssp0));
     Assert.assertEquals(ime1Size + ime11Size, consumer.getMessagesSizeInQueue(ssp1));
+
+    consumer.stop();
   }
 
   @Test
@@ -178,7 +175,7 @@ public class TestKafkaSystemConsumer {
 
     // limit by number of messages 4/2 = 2 per partition
     // limit by number of bytes - disabled
-    KafkaSystemConsumer consumer = setupConsumer("4", "0"); // should disable
+    KafkaSystemConsumer consumer = createConsumer("4", "0"); // should disable
 
     consumer.register(ssp0, "0");
     consumer.register(ssp1, "0");
@@ -197,6 +194,8 @@ public class TestKafkaSystemConsumer {
     Assert.assertEquals(2, consumer.getNumMessagesInQueue(ssp1));
     Assert.assertEquals(ime0Size, consumer.getMessagesSizeInQueue(ssp0));
     Assert.assertEquals(ime1Size + ime11Size, consumer.getMessagesSizeInQueue(ssp1));
+
+    consumer.stop();
   }
 
   // mock kafkaConsumer and SystemConsumer
@@ -206,17 +205,12 @@ public class TestKafkaSystemConsumer {
     }
   }
 
-  static class MockKafkaSystmeCosumer extends KafkaSystemConsumer {
-    public MockKafkaSystmeCosumer(Consumer kafkaConsumer, String systemName, Config config, String clientId,
+  static class MockKafkaSystemConsumer extends KafkaSystemConsumer {
+    public MockKafkaSystemConsumer(Consumer kafkaConsumer, String systemName, Config config, String clientId,
         KafkaSystemConsumerMetrics metrics, Clock clock) {
       super(kafkaConsumer, systemName, config, clientId, metrics, clock);
     }
 
-    //@Override
-    //void createConsumerProxy() {
-    //  this.messageSink = new KafkaConsumerMessageSink();
-    //}
-
     @Override
     void startConsumer() {
     }