You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ca...@apache.org on 2020/06/29 18:42:01 UTC

[samza] branch master updated: SAMZA-2551: Upgrade all modules to automatically use checkstyle 6.11.2 (part 3: includes samza-kafka) (#1394)

This is an automated email from the ASF dual-hosted git repository.

cameronlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new fcf1bbc  SAMZA-2551: Upgrade all modules to automatically use checkstyle 6.11.2 (part 3: includes samza-kafka) (#1394)
fcf1bbc is described below

commit fcf1bbc8fcc41fc38eb870dc16c10cf55e78d706
Author: Cameron Lee <ca...@linkedin.com>
AuthorDate: Mon Jun 29 11:41:53 2020 -0700

    SAMZA-2551: Upgrade all modules to automatically use checkstyle 6.11.2 (part 3: includes samza-kafka) (#1394)
    
    API/Upgrade/Usage changes: None
---
 build.gradle                                       |   6 +
 checkstyle/checkstyle-suppressions.xml             |  11 +
 .../kafka/KafkaCheckpointLogKeySerde.java          |   6 +-
 .../apache/samza/config/KafkaConsumerConfig.java   |  30 ++-
 .../samza/system/kafka/KafkaConsumerProxy.java     |  11 +-
 .../system/kafka/KafkaConsumerProxyFactory.java    |   2 -
 .../apache/samza/system/kafka/KafkaStreamSpec.java |  18 +-
 .../samza/system/kafka/KafkaSystemAdmin.java       | 242 ++++++++++-----------
 .../samza/system/kafka/KafkaSystemConsumer.java    |   6 +-
 .../kafka/descriptors/KafkaSystemDescriptor.java   |   2 +-
 .../kafka/TestKafkaCheckpointManagerJava.java      |   2 +-
 .../samza/system/kafka/MockKafkaProducer.java      |  41 ++--
 .../kafka/TestKafkaCheckpointManagerFactory.java   |   1 -
 .../samza/system/kafka/TestKafkaStreamSpec.java    |   8 +-
 .../system/kafka/TestKafkaSystemAdminJava.java     |  24 +-
 .../system/kafka/TestKafkaSystemAdminWithMock.java |   2 +-
 .../system/kafka/TestKafkaSystemConsumer.java      |   5 +-
 .../kafka/TestKafkaSystemConsumerMetrics.java      |   5 +-
 .../system/kafka/TestKafkaSystemProducerJava.java  |  25 +--
 .../descriptors/TestKafkaInputDescriptor.java      |   5 +-
 .../descriptors/TestKafkaSystemDescriptor.java     |   3 +-
 21 files changed, 221 insertions(+), 234 deletions(-)

diff --git a/build.gradle b/build.gradle
index 25afaeb..7268838 100644
--- a/build.gradle
+++ b/build.gradle
@@ -397,6 +397,7 @@ project(":samza-tools_$scalaSuffix") {
 
 project(":samza-kafka_$scalaSuffix") {
   apply plugin: 'scala'
+  apply plugin: 'checkstyle'
 
   // Force scala joint compilation
   sourceSets.main.scala.srcDir "src/main/java"
@@ -434,6 +435,11 @@ project(":samza-kafka_$scalaSuffix") {
     testRuntime "org.slf4j:slf4j-simple:$slf4jVersion"
   }
 
+  checkstyle {
+    configFile = new File(rootDir, "checkstyle/checkstyle.xml")
+    toolVersion = "$checkstyleVersion"
+  }
+
   test {
     // Bump up the heap so we can start ZooKeeper and Kafka brokers.
     minHeapSize = "1560m"
diff --git a/checkstyle/checkstyle-suppressions.xml b/checkstyle/checkstyle-suppressions.xml
index e887fdc..0c9bad5 100644
--- a/checkstyle/checkstyle-suppressions.xml
+++ b/checkstyle/checkstyle-suppressions.xml
@@ -25,8 +25,19 @@
   <suppress checks="ConstantName"
             files="ApplicationStatus.java"
             lines="26-29"/>
+  <!--
+    API javadocs (including descriptors) may reference classes (using '{@link <className>}') which
+    aren't directly used in the API class. In order to help the API javadoc look cleaner, we can use
+    the simple class name in the javadoc and then import that class. However, for non-API code, we
+    want checkstyle prevent an import just for documentation purposes. The
+    "preventJavadocUnusedImports" id includes processing of javadocs when checking unused imports.
+    We will apply that to the API classes only.
+  -->
   <suppress id="preventJavadocUnusedImports"
             files=".*samza-api.*"/>
+  <suppress id="preventJavadocUnusedImports"
+            files="samza-kafka/src/main/java/org/apache/samza/system/kafka/descriptors" />
+
   <!-- suppress avro schema classes since they are based on auto-generated code -->
   <suppress files="samza-sql/src/test/java/org/apache/samza/sql/avro/schemas" checks=".*" />
 </suppressions>
diff --git a/samza-kafka/src/main/java/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKeySerde.java b/samza-kafka/src/main/java/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKeySerde.java
index 8e0c815..cc771d3 100644
--- a/samza-kafka/src/main/java/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKeySerde.java
+++ b/samza-kafka/src/main/java/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKeySerde.java
@@ -39,12 +39,12 @@ public class KafkaCheckpointLogKeySerde implements Serde<KafkaCheckpointLogKey>
   private static final String SSP_GROUPER_FACTORY_FIELD = "systemstreampartition-grouper-factory";
   private static final String TASK_NAME_FIELD = "taskName";
   private static final String TYPE_FIELD = "type";
-  private static final ObjectMapper mapper = new ObjectMapper();
+  private static final ObjectMapper MAPPER = new ObjectMapper();
 
   @Override
   public byte[] toBytes(KafkaCheckpointLogKey key) {
     try {
-      return mapper.writeValueAsBytes(ImmutableMap.of(
+      return MAPPER.writeValueAsBytes(ImmutableMap.of(
           SSP_GROUPER_FACTORY_FIELD, key.getGrouperFactoryClassName(),
           TASK_NAME_FIELD, key.getTaskName().toString(),
           TYPE_FIELD, key.getType()
@@ -57,7 +57,7 @@ public class KafkaCheckpointLogKeySerde implements Serde<KafkaCheckpointLogKey>
   @Override
   public KafkaCheckpointLogKey fromBytes(byte[] bytes) {
     try {
-      LinkedHashMap<String, String> deserializedKey = mapper.readValue(bytes, LinkedHashMap.class);
+      LinkedHashMap<String, String> deserializedKey = MAPPER.readValue(bytes, LinkedHashMap.class);
 
       if (!KafkaCheckpointLogKey.CHECKPOINT_KEY_TYPE.equals(deserializedKey.get(TYPE_FIELD))) {
         throw new IllegalArgumentException(String.format("Invalid key detected. Type of the key is %s", deserializedKey.get(TYPE_FIELD)));
diff --git a/samza-kafka/src/main/java/org/apache/samza/config/KafkaConsumerConfig.java b/samza-kafka/src/main/java/org/apache/samza/config/KafkaConsumerConfig.java
index 3ac84df..d7464c6 100644
--- a/samza-kafka/src/main/java/org/apache/samza/config/KafkaConsumerConfig.java
+++ b/samza-kafka/src/main/java/org/apache/samza/config/KafkaConsumerConfig.java
@@ -1,5 +1,4 @@
 /*
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -16,14 +15,11 @@
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
- *
  */
-
 package org.apache.samza.config;
 
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Optional;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
@@ -128,7 +124,7 @@ public class KafkaConsumerConfig extends HashMap<String, Object> {
   }
 
   public int fetchMessageMaxBytes() {
-    String fetchSize = (String)get("fetch.message.max.bytes");
+    String fetchSize = (String) get("fetch.message.max.bytes");
     if (StringUtils.isBlank(fetchSize)) {
       return FETCH_MAX_BYTES;
     } else  {
@@ -177,26 +173,26 @@ public class KafkaConsumerConfig extends HashMap<String, Object> {
    */
   static String getAutoOffsetResetValue(final String autoOffsetReset, final String samzaOffsetDefault) {
     // valid kafka consumer values
-    final String KAFKA_OFFSET_LATEST = "latest";
-    final String KAFKA_OFFSET_EARLIEST = "earliest";
-    final String KAFKA_OFFSET_NONE = "none";
+    final String kafkaOffsetLatest = "latest";
+    final String kafkaOffsetEarliest = "earliest";
+    final String kafkaOffsetNone = "none";
 
     // if the value for KafkaConsumer is set - use it.
     if (!StringUtils.isBlank(autoOffsetReset)) {
-      if (autoOffsetReset.equals(KAFKA_OFFSET_EARLIEST) || autoOffsetReset.equals(KAFKA_OFFSET_LATEST)
-          || autoOffsetReset.equals(KAFKA_OFFSET_NONE)) {
+      if (autoOffsetReset.equals(kafkaOffsetEarliest) || autoOffsetReset.equals(kafkaOffsetLatest)
+          || autoOffsetReset.equals(kafkaOffsetNone)) {
         return autoOffsetReset;
       }
       // translate old kafka consumer values into new ones (SAMZA-1987 top remove it)
       String newAutoOffsetReset;
       switch (autoOffsetReset) {
         case "largest":
-          newAutoOffsetReset = KAFKA_OFFSET_LATEST;
-          LOG.warn("Using old (deprecated) value for kafka consumer config auto.offset.reset = {}. The right value should be {}", autoOffsetReset, KAFKA_OFFSET_LATEST);
+          newAutoOffsetReset = kafkaOffsetLatest;
+          LOG.warn("Using old (deprecated) value for kafka consumer config auto.offset.reset = {}. The right value should be {}", autoOffsetReset, kafkaOffsetLatest);
           break;
         case "smallest":
-          newAutoOffsetReset = KAFKA_OFFSET_EARLIEST;
-          LOG.warn("Using old (deprecated) value for kafka consumer config auto.offset.reset = {}. The right value should be {}", autoOffsetReset, KAFKA_OFFSET_EARLIEST);
+          newAutoOffsetReset = kafkaOffsetEarliest;
+          LOG.warn("Using old (deprecated) value for kafka consumer config auto.offset.reset = {}. The right value should be {}", autoOffsetReset, kafkaOffsetEarliest);
           break;
         default:
           throw new SamzaException("Using invalid value for kafka consumer config auto.offset.reset " + autoOffsetReset + ". See KafkaConsumer config for the correct values.");
@@ -207,14 +203,14 @@ public class KafkaConsumerConfig extends HashMap<String, Object> {
     }
 
     // in case kafka consumer configs are not provided we should match them to Samza's ones.
-    String newAutoOffsetReset = KAFKA_OFFSET_LATEST;
+    String newAutoOffsetReset = kafkaOffsetLatest;
     if (!StringUtils.isBlank(samzaOffsetDefault)) {
       switch (samzaOffsetDefault) {
         case SystemConfig.SAMZA_SYSTEM_OFFSET_UPCOMING:
-          newAutoOffsetReset = KAFKA_OFFSET_LATEST;
+          newAutoOffsetReset = kafkaOffsetLatest;
           break;
         case SystemConfig.SAMZA_SYSTEM_OFFSET_OLDEST:
-          newAutoOffsetReset = KAFKA_OFFSET_EARLIEST;
+          newAutoOffsetReset = kafkaOffsetEarliest;
           break;
         default:
           throw new SamzaException("Using invalid value for samza default offset config " + autoOffsetReset + ". See samza config for the correct values");
diff --git a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaConsumerProxy.java b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaConsumerProxy.java
index 71ebe00..329073c 100644
--- a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaConsumerProxy.java
+++ b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaConsumerProxy.java
@@ -1,5 +1,4 @@
 /*
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -16,9 +15,7 @@
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
- *
  */
-
 package org.apache.samza.system.kafka;
 
 import java.time.Instant;
@@ -124,12 +121,12 @@ public class KafkaConsumerProxy<K, V> {
 
     isRunning = false;
     try {
-      consumerPollThread.join(timeoutMs/2);
+      consumerPollThread.join(timeoutMs / 2);
       // join() may timeout
       // in this case we should interrupt it and wait again
       if (consumerPollThread.isAlive()) {
         consumerPollThread.interrupt();
-        consumerPollThread.join(timeoutMs/2);
+        consumerPollThread.join(timeoutMs / 2);
       }
     } catch (InterruptedException e) {
       LOG.warn("Join in KafkaConsumerProxy has failed", e);
@@ -452,13 +449,11 @@ public class KafkaConsumerProxy<K, V> {
     }
   }
 
-   @Override
+  @Override
   public String toString() {
     return String.format("consumerProxy-%s-%s", systemName, clientId);
   }
 
-
-
   /**
    * Used to create an instance of {@link KafkaConsumerProxy}. This can be overridden in case an extension of
    * {@link KafkaConsumerProxy} needs to be used within kafka system components like {@link KafkaSystemConsumer}.
diff --git a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaConsumerProxyFactory.java b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaConsumerProxyFactory.java
index cc4bddc..903e511 100644
--- a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaConsumerProxyFactory.java
+++ b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaConsumerProxyFactory.java
@@ -1,5 +1,4 @@
 /*
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -16,7 +15,6 @@
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
- *
  */
 package org.apache.samza.system.kafka;
 
diff --git a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java
index d621308..3324c67 100644
--- a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java
+++ b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java
@@ -97,18 +97,18 @@ public class KafkaStreamSpec extends StreamSpec {
    */
   public static KafkaStreamSpec fromSpec(StreamSpec originalSpec) {
     if (originalSpec instanceof KafkaStreamSpec) {
-      return ((KafkaStreamSpec) originalSpec);
+      return (KafkaStreamSpec) originalSpec;
     }
 
-    int replicationFactor = Integer.parseInt(originalSpec.getOrDefault( KafkaConfig.TOPIC_REPLICATION_FACTOR(),
-                                                                        KafkaConfig.TOPIC_DEFAULT_REPLICATION_FACTOR()));
+    int replicationFactor = Integer.parseInt(originalSpec.getOrDefault(KafkaConfig.TOPIC_REPLICATION_FACTOR(),
+        KafkaConfig.TOPIC_DEFAULT_REPLICATION_FACTOR()));
 
-    return new KafkaStreamSpec( originalSpec.getId(),
-                                originalSpec.getPhysicalName(),
-                                originalSpec.getSystemName(),
-                                originalSpec.getPartitionCount(),
-                                replicationFactor,
-                                mapToProperties(filterUnsupportedProperties(originalSpec.getConfig())));
+    return new KafkaStreamSpec(originalSpec.getId(),
+                               originalSpec.getPhysicalName(),
+                               originalSpec.getSystemName(),
+                               originalSpec.getPartitionCount(),
+                               replicationFactor,
+                               mapToProperties(filterUnsupportedProperties(originalSpec.getConfig())));
   }
 
   /**
diff --git a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
index 91d4b11..a60752c 100644
--- a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
+++ b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
@@ -213,49 +213,49 @@ public class KafkaSystemAdmin implements SystemAdmin {
   public Map<String, SystemStreamMetadata> getSystemStreamPartitionCounts(Set<String> streamNames, long cacheTTL) {
     // This optimization omits actual metadata for performance. Instead, we inject a dummy for all partitions.
     final SystemStreamMetadata.SystemStreamPartitionMetadata dummySspm =
-        new SystemStreamMetadata.SystemStreamPartitionMetadata(null, null, null) {
-          String msg =
-              "getSystemStreamPartitionCounts does not populate SystemStreaMetadata info. Only number of partitions";
+      new SystemStreamMetadata.SystemStreamPartitionMetadata(null, null, null) {
+        String msg =
+            "getSystemStreamPartitionCounts does not populate SystemStreaMetadata info. Only number of partitions";
 
-          @Override
-          public String getOldestOffset() {
-            throw new NotImplementedException(msg);
-          }
+        @Override
+        public String getOldestOffset() {
+          throw new NotImplementedException(msg);
+        }
 
-          @Override
-          public String getNewestOffset() {
-            throw new NotImplementedException(msg);
-          }
+        @Override
+        public String getNewestOffset() {
+          throw new NotImplementedException(msg);
+        }
 
-          @Override
-          public String getUpcomingOffset() {
-            throw new NotImplementedException(msg);
-          }
-        };
+        @Override
+        public String getUpcomingOffset() {
+          throw new NotImplementedException(msg);
+        }
+      };
 
     ExponentialSleepStrategy strategy = new ExponentialSleepStrategy(DEFAULT_EXPONENTIAL_SLEEP_BACK_OFF_MULTIPLIER,
         DEFAULT_EXPONENTIAL_SLEEP_INITIAL_DELAY_MS, DEFAULT_EXPONENTIAL_SLEEP_MAX_DELAY_MS);
 
     Function1<ExponentialSleepStrategy.RetryLoop, Map<String, SystemStreamMetadata>> fetchMetadataOperation =
-        new AbstractFunction1<ExponentialSleepStrategy.RetryLoop, Map<String, SystemStreamMetadata>>() {
-          @Override
-          public Map<String, SystemStreamMetadata> apply(ExponentialSleepStrategy.RetryLoop loop) {
-            Map<String, SystemStreamMetadata> allMetadata = new HashMap<>();
-
-            streamNames.forEach(streamName -> {
-              Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata> partitionMetadata = new HashMap<>();
-
-              List<PartitionInfo> partitionInfos = threadSafeKafkaConsumer.execute(consumer -> consumer.partitionsFor(streamName));
-              LOG.debug("Stream {} has partitions {}", streamName, partitionInfos);
-              partitionInfos.forEach(
-                  partitionInfo -> partitionMetadata.put(new Partition(partitionInfo.partition()), dummySspm));
-              allMetadata.put(streamName, new SystemStreamMetadata(streamName, partitionMetadata));
-            });
-
-            loop.done();
-            return allMetadata;
-          }
-        };
+      new AbstractFunction1<ExponentialSleepStrategy.RetryLoop, Map<String, SystemStreamMetadata>>() {
+        @Override
+        public Map<String, SystemStreamMetadata> apply(ExponentialSleepStrategy.RetryLoop loop) {
+          Map<String, SystemStreamMetadata> allMetadata = new HashMap<>();
+
+          streamNames.forEach(streamName -> {
+            Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata> partitionMetadata = new HashMap<>();
+
+            List<PartitionInfo> partitionInfos = threadSafeKafkaConsumer.execute(consumer -> consumer.partitionsFor(streamName));
+            LOG.debug("Stream {} has partitions {}", streamName, partitionInfos);
+            partitionInfos.forEach(
+              partitionInfo -> partitionMetadata.put(new Partition(partitionInfo.partition()), dummySspm));
+            allMetadata.put(streamName, new SystemStreamMetadata(streamName, partitionMetadata));
+          });
+
+          loop.done();
+          return allMetadata;
+        }
+      };
 
     Map<String, SystemStreamMetadata> result = strategy.run(fetchMetadataOperation,
         new AbstractFunction2<Exception, ExponentialSleepStrategy.RetryLoop, BoxedUnit>() {
@@ -323,51 +323,51 @@ public class KafkaSystemAdmin implements SystemAdmin {
 
     Function1<ExponentialSleepStrategy.RetryLoop, Map<SystemStreamPartition,
         SystemStreamMetadata.SystemStreamPartitionMetadata>> fetchTopicPartitionMetadataOperation =
-        new AbstractFunction1<ExponentialSleepStrategy.RetryLoop, Map<SystemStreamPartition,
-            SystemStreamMetadata.SystemStreamPartitionMetadata>>() {
-
-          @Override
-          public Map<SystemStreamPartition, SystemStreamMetadata.SystemStreamPartitionMetadata> apply(
-              ExponentialSleepStrategy.RetryLoop loop) {
-            OffsetsMaps topicPartitionsMetadata = fetchTopicPartitionsMetadata(topicPartitions);
-
-            Map<SystemStreamPartition, SystemStreamMetadata.SystemStreamPartitionMetadata> sspToSSPMetadata = new HashMap<>();
-            for (SystemStreamPartition ssp : ssps) {
-              String oldestOffset = topicPartitionsMetadata.getOldestOffsets().get(ssp);
-              String newestOffset = topicPartitionsMetadata.getNewestOffsets().get(ssp);
-              String upcomingOffset = topicPartitionsMetadata.getUpcomingOffsets().get(ssp);
-
-              sspToSSPMetadata.put(ssp,
-                  new SystemStreamMetadata.SystemStreamPartitionMetadata(oldestOffset, newestOffset, upcomingOffset));
-            }
-            loop.done();
-            return sspToSSPMetadata;
+      new AbstractFunction1<ExponentialSleepStrategy.RetryLoop, Map<SystemStreamPartition,
+          SystemStreamMetadata.SystemStreamPartitionMetadata>>() {
+
+        @Override
+        public Map<SystemStreamPartition, SystemStreamMetadata.SystemStreamPartitionMetadata> apply(
+            ExponentialSleepStrategy.RetryLoop loop) {
+          OffsetsMaps topicPartitionsMetadata = fetchTopicPartitionsMetadata(topicPartitions);
+
+          Map<SystemStreamPartition, SystemStreamMetadata.SystemStreamPartitionMetadata> sspToSSPMetadata = new HashMap<>();
+          for (SystemStreamPartition ssp : ssps) {
+            String oldestOffset = topicPartitionsMetadata.getOldestOffsets().get(ssp);
+            String newestOffset = topicPartitionsMetadata.getNewestOffsets().get(ssp);
+            String upcomingOffset = topicPartitionsMetadata.getUpcomingOffsets().get(ssp);
+
+            sspToSSPMetadata.put(ssp,
+                new SystemStreamMetadata.SystemStreamPartitionMetadata(oldestOffset, newestOffset, upcomingOffset));
           }
-        };
+          loop.done();
+          return sspToSSPMetadata;
+        }
+      };
 
     Function2<Exception, ExponentialSleepStrategy.RetryLoop, BoxedUnit> onExceptionRetryOperation =
-        new AbstractFunction2<Exception, ExponentialSleepStrategy.RetryLoop, BoxedUnit>() {
-          @Override
-          public BoxedUnit apply(Exception exception, ExponentialSleepStrategy.RetryLoop loop) {
-            if (loop.sleepCount() < MAX_RETRIES_ON_EXCEPTION) {
-              LOG.warn(
-                  String.format("Fetching SSP metadata for: %s threw an exception. Retrying.", ssps), exception);
-            } else {
-              LOG.error(String.format("Fetching SSP metadata for: %s threw an exception.", ssps), exception);
-              loop.done();
-              throw new SamzaException(exception);
-            }
-            return null;
+      new AbstractFunction2<Exception, ExponentialSleepStrategy.RetryLoop, BoxedUnit>() {
+        @Override
+        public BoxedUnit apply(Exception exception, ExponentialSleepStrategy.RetryLoop loop) {
+          if (loop.sleepCount() < MAX_RETRIES_ON_EXCEPTION) {
+            LOG.warn(
+                String.format("Fetching SSP metadata for: %s threw an exception. Retrying.", ssps), exception);
+          } else {
+            LOG.error(String.format("Fetching SSP metadata for: %s threw an exception.", ssps), exception);
+            loop.done();
+            throw new SamzaException(exception);
           }
-        };
+          return null;
+        }
+      };
 
     Function0<Map<SystemStreamPartition, SystemStreamMetadata.SystemStreamPartitionMetadata>> fallbackOperation =
-        new AbstractFunction0<Map<SystemStreamPartition, SystemStreamMetadata.SystemStreamPartitionMetadata>>() {
-          @Override
-          public Map<SystemStreamPartition, SystemStreamMetadata.SystemStreamPartitionMetadata> apply() {
-            throw new SamzaException("Failed to get SSP metadata");
-          }
-        };
+      new AbstractFunction0<Map<SystemStreamPartition, SystemStreamMetadata.SystemStreamPartitionMetadata>>() {
+        @Override
+        public Map<SystemStreamPartition, SystemStreamMetadata.SystemStreamPartitionMetadata> apply() {
+          throw new SamzaException("Failed to get SSP metadata");
+        }
+      };
 
     return retryBackoff.run(fetchTopicPartitionMetadataOperation, onExceptionRetryOperation).getOrElse(fallbackOperation);
   }
@@ -389,41 +389,41 @@ public class KafkaSystemAdmin implements SystemAdmin {
     LOG.info("Fetching system stream metadata for {} from system {}", streamNames, systemName);
 
     Function1<ExponentialSleepStrategy.RetryLoop, Map<String, SystemStreamMetadata>> fetchMetadataOperation =
-        new AbstractFunction1<ExponentialSleepStrategy.RetryLoop, Map<String, SystemStreamMetadata>>() {
-          @Override
-          public Map<String, SystemStreamMetadata> apply(ExponentialSleepStrategy.RetryLoop loop) {
-            Map<String, SystemStreamMetadata> metadata = fetchSystemStreamMetadata(streamNames);
-            loop.done();
-            return metadata;
-          }
-        };
+      new AbstractFunction1<ExponentialSleepStrategy.RetryLoop, Map<String, SystemStreamMetadata>>() {
+        @Override
+        public Map<String, SystemStreamMetadata> apply(ExponentialSleepStrategy.RetryLoop loop) {
+          Map<String, SystemStreamMetadata> metadata = fetchSystemStreamMetadata(streamNames);
+          loop.done();
+          return metadata;
+        }
+      };
 
     Function2<Exception, ExponentialSleepStrategy.RetryLoop, BoxedUnit> onExceptionRetryOperation =
-        new AbstractFunction2<Exception, ExponentialSleepStrategy.RetryLoop, BoxedUnit>() {
-          @Override
-          public BoxedUnit apply(Exception exception, ExponentialSleepStrategy.RetryLoop loop) {
-            if (loop.sleepCount() < MAX_RETRIES_ON_EXCEPTION) {
-              LOG.warn(
-                  String.format("Fetching system stream metadata for: %s threw an exception. Retrying.", streamNames),
-                  exception);
-            } else {
-              LOG.error(String.format("Fetching system stream metadata for: %s threw an exception.", streamNames),
-                  exception);
-              loop.done();
-              throw new SamzaException(exception);
-            }
-
-            return null;
+      new AbstractFunction2<Exception, ExponentialSleepStrategy.RetryLoop, BoxedUnit>() {
+        @Override
+        public BoxedUnit apply(Exception exception, ExponentialSleepStrategy.RetryLoop loop) {
+          if (loop.sleepCount() < MAX_RETRIES_ON_EXCEPTION) {
+            LOG.warn(
+                String.format("Fetching system stream metadata for: %s threw an exception. Retrying.", streamNames),
+                exception);
+          } else {
+            LOG.error(String.format("Fetching system stream metadata for: %s threw an exception.", streamNames),
+                exception);
+            loop.done();
+            throw new SamzaException(exception);
           }
-        };
+
+          return null;
+        }
+      };
 
     Function0<Map<String, SystemStreamMetadata>> fallbackOperation =
-        new AbstractFunction0<Map<String, SystemStreamMetadata>>() {
-          @Override
-          public Map<String, SystemStreamMetadata> apply() {
-            throw new SamzaException("Failed to get system stream metadata");
-          }
-        };
+      new AbstractFunction0<Map<String, SystemStreamMetadata>>() {
+        @Override
+        public Map<String, SystemStreamMetadata> apply() {
+          throw new SamzaException("Failed to get system stream metadata");
+        }
+      };
 
     return retryBackoff.run(fetchMetadataOperation, onExceptionRetryOperation).getOrElse(fallbackOperation);
   }
@@ -486,16 +486,16 @@ public class KafkaSystemAdmin implements SystemAdmin {
 
     topics.forEach(topic -> {
       OffsetsMaps offsetsForTopic = threadSafeKafkaConsumer.execute(consumer -> {
-         List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
-         if (partitionInfos == null) {
-           String msg = String.format("Partition info not(yet?) available for system %s topic %s", systemName, topic);
-           throw new SamzaException(msg);
-         }
-         List<TopicPartition> topicPartitions = partitionInfos.stream()
-          .map(partitionInfo -> new TopicPartition(partitionInfo.topic(), partitionInfo.partition()))
-          .collect(Collectors.toList());
-         return fetchTopicPartitionsMetadata(topicPartitions);
-       });
+        List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
+        if (partitionInfos == null) {
+          String msg = String.format("Partition info not(yet?) available for system %s topic %s", systemName, topic);
+          throw new SamzaException(msg);
+        }
+        List<TopicPartition> topicPartitions = partitionInfos.stream()
+            .map(partitionInfo -> new TopicPartition(partitionInfo.topic(), partitionInfo.partition()))
+            .collect(Collectors.toList());
+        return fetchTopicPartitionsMetadata(topicPartitions);
+      });
       allOldestOffsets.putAll(offsetsForTopic.getOldestOffsets());
       allNewestOffsets.putAll(offsetsForTopic.getNewestOffsets());
       allUpcomingOffsets.putAll(offsetsForTopic.getUpcomingOffsets());
@@ -516,7 +516,7 @@ public class KafkaSystemAdmin implements SystemAdmin {
   @Override
   public boolean createStream(StreamSpec streamSpec) {
     LOG.info("Creating Kafka topic: {} on system: {}", streamSpec.getPhysicalName(), streamSpec.getSystemName());
-    final String REPL_FACTOR = "replication.factor";
+    final String replFactor = "replication.factor";
 
     KafkaStreamSpec kafkaStreamSpec = toKafkaSpec(streamSpec);
     String topicName = kafkaStreamSpec.getPhysicalName();
@@ -527,11 +527,11 @@ public class KafkaSystemAdmin implements SystemAdmin {
     // specify the configs
     Map<String, String> streamConfig = new HashMap<>(kafkaStreamSpec.getConfig());
     // HACK - replication.factor is invalid config for AdminClient.createTopics
-    if (streamConfig.containsKey(REPL_FACTOR)) {
-      String repl = streamConfig.get(REPL_FACTOR);
+    if (streamConfig.containsKey(replFactor)) {
+      String repl = streamConfig.get(replFactor);
       LOG.warn("Configuration {}={} for topic={} is invalid. Using kSpec repl factor {}",
-          REPL_FACTOR, repl, kafkaStreamSpec.getPhysicalName(), kafkaStreamSpec.getReplicationFactor());
-      streamConfig.remove(REPL_FACTOR);
+          replFactor, repl, kafkaStreamSpec.getPhysicalName(), kafkaStreamSpec.getReplicationFactor());
+      streamConfig.remove(replFactor);
     }
     newTopic.configs(new MapConfig(streamConfig));
     CreateTopicsResult result = adminClient.createTopics(ImmutableSet.of(newTopic));
@@ -653,8 +653,8 @@ public class KafkaSystemAdmin implements SystemAdmin {
       Map<TopicPartition, RecordsToDelete> recordsToDelete = offsets.entrySet()
           .stream()
           .collect(Collectors.toMap(entry ->
-              new TopicPartition(entry.getKey().getStream(), entry.getKey().getPartition().getPartitionId()),
-              entry -> RecordsToDelete.beforeOffset(Long.parseLong(entry.getValue()) + 1)));
+            new TopicPartition(entry.getKey().getStream(), entry.getKey().getPartition().getPartitionId()),
+            entry -> RecordsToDelete.beforeOffset(Long.parseLong(entry.getValue()) + 1)));
 
       adminClient.deleteRecords(recordsToDelete).all().whenComplete((ignored, exception) -> {
         if (exception != null) {
diff --git a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java
index 3974e02..27bd638 100644
--- a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java
+++ b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java
@@ -1,6 +1,4 @@
-
 /*
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -17,9 +15,7 @@
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
- *
  */
-
 package org.apache.samza.system.kafka;
 
 import java.util.HashMap;
@@ -158,7 +154,7 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
   void startConsumer() {
     // set the offset for each TopicPartition
     if (topicPartitionsToOffset.size() <= 0) {
-      LOG.error ("{}: Consumer is not subscribed to any SSPs", this);
+      LOG.error("{}: Consumer is not subscribed to any SSPs", this);
     }
 
     topicPartitionsToOffset.forEach((topicPartition, startingOffsetString) -> {
diff --git a/samza-kafka/src/main/java/org/apache/samza/system/kafka/descriptors/KafkaSystemDescriptor.java b/samza-kafka/src/main/java/org/apache/samza/system/kafka/descriptors/KafkaSystemDescriptor.java
index 8c4d48b..bf42320 100644
--- a/samza-kafka/src/main/java/org/apache/samza/system/kafka/descriptors/KafkaSystemDescriptor.java
+++ b/samza-kafka/src/main/java/org/apache/samza/system/kafka/descriptors/KafkaSystemDescriptor.java
@@ -228,7 +228,7 @@ public class KafkaSystemDescriptor extends SystemDescriptor<KafkaSystemDescripto
   @Override
   public Map<String, String> toConfig() {
     Map<String, String> configs = new HashMap<>(super.toConfig());
-    if(!consumerZkConnect.isEmpty()) {
+    if (!consumerZkConnect.isEmpty()) {
       configs.put(String.format(CONSUMER_ZK_CONNECT_CONFIG_KEY, getSystemName()), String.join(",", consumerZkConnect));
     }
     consumerAutoOffsetResetOptional.ifPresent(consumerAutoOffsetReset ->
diff --git a/samza-kafka/src/test/java/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManagerJava.java b/samza-kafka/src/test/java/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManagerJava.java
index 4e52ced..1280e79 100644
--- a/samza-kafka/src/test/java/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManagerJava.java
+++ b/samza-kafka/src/test/java/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManagerJava.java
@@ -196,7 +196,7 @@ public class TestKafkaCheckpointManagerJava {
 
     // mock out a consumer that returns ten checkpoint IMEs for the same ssp
     List<List<IncomingMessageEnvelope>> pollOutputs = new ArrayList<>();
-    for(int offset = oldestOffset; offset <= newestOffset; offset++) {
+    for (int offset = oldestOffset; offset <= newestOffset; offset++) {
       pollOutputs.add(ImmutableList.of(newCheckpointEnvelope(TASK1, ssp, Integer.toString(offset))));
     }
 
diff --git a/samza-kafka/src/test/java/org/apache/samza/system/kafka/MockKafkaProducer.java b/samza-kafka/src/test/java/org/apache/samza/system/kafka/MockKafkaProducer.java
index 95676b7..a28b23e 100644
--- a/samza-kafka/src/test/java/org/apache/samza/system/kafka/MockKafkaProducer.java
+++ b/samza-kafka/src/test/java/org/apache/samza/system/kafka/MockKafkaProducer.java
@@ -41,14 +41,13 @@ import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.record.Record;
 import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.test.TestUtils;
 
 public class MockKafkaProducer implements Producer<byte[], byte[]> {
 
-  private Cluster _cluster;
-  private List<FutureTask<RecordMetadata>> _callbacksList = new ArrayList<FutureTask<RecordMetadata>>();
+  private Cluster cluster;
+  private List<FutureTask<RecordMetadata>> callbacksList = new ArrayList<FutureTask<RecordMetadata>>();
   private boolean shouldBuffer = false;
   private boolean errorNext = false;
   private boolean errorInCallback = true;
@@ -70,7 +69,7 @@ public class MockKafkaProducer implements Producer<byte[], byte[]> {
    *  - "Offset" in RecordMetadata is not guranteed to be correct
    */
   public MockKafkaProducer(int numNodes, String topicName, int numPartitions) {
-    this._cluster = TestUtils.clusterWith(numNodes, topicName, numPartitions);
+    this.cluster = TestUtils.clusterWith(numNodes, topicName, numPartitions);
   }
 
   public void setShouldBuffer(boolean shouldBuffer) {
@@ -110,7 +109,7 @@ public class MockKafkaProducer implements Producer<byte[], byte[]> {
     if (errorNext) {
       if (!errorInCallback) {
         this.errorNext = false;
-        throw (RuntimeException)exception;
+        throw (RuntimeException) exception;
       }
       if (shouldBuffer) {
         FutureTask<RecordMetadata> f = new FutureTask<RecordMetadata>(new Callable<RecordMetadata>() {
@@ -121,7 +120,7 @@ public class MockKafkaProducer implements Producer<byte[], byte[]> {
             return getRecordMetadata(record);
           }
         });
-        _callbacksList.add(f);
+        callbacksList.add(f);
         this.errorNext = false;
         return f;
       } else {
@@ -141,7 +140,7 @@ public class MockKafkaProducer implements Producer<byte[], byte[]> {
             return metadata;
           }
         });
-        _callbacksList.add(f);
+        callbacksList.add(f);
         return f;
       } else {
         int offset = msgsSent.incrementAndGet();
@@ -154,7 +153,7 @@ public class MockKafkaProducer implements Producer<byte[], byte[]> {
 
   @Override
   public List<PartitionInfo> partitionsFor(String topic) {
-    return this._cluster.partitionsForTopic(topic);
+    return this.cluster.partitionsForTopic(topic);
   }
 
   @Override
@@ -187,7 +186,7 @@ public class MockKafkaProducer implements Producer<byte[], byte[]> {
     return openCount;
   }
 
-  public synchronized void flush () {
+  public synchronized void flush() {
     new FlushRunnable(0).run();
   }
 
@@ -248,11 +247,11 @@ public class MockKafkaProducer implements Producer<byte[], byte[]> {
   private static class FutureSuccess implements Future<RecordMetadata> {
 
     private ProducerRecord record;
-    private final RecordMetadata _metadata;
+    private final RecordMetadata metadata;
 
     public FutureSuccess(ProducerRecord record, int offset) {
       this.record = record;
-      this._metadata = new RecordMetadata(new TopicPartition(record.topic(), record.partition() == null ? 0 : record.partition()), 0, offset, RecordBatch.NO_TIMESTAMP, -1L, -1, -1);
+      this.metadata = new RecordMetadata(new TopicPartition(record.topic(), record.partition() == null ? 0 : record.partition()), 0, offset, RecordBatch.NO_TIMESTAMP, -1L, -1, -1);
     }
 
     @Override
@@ -262,12 +261,12 @@ public class MockKafkaProducer implements Producer<byte[], byte[]> {
 
     @Override
     public RecordMetadata get() throws ExecutionException {
-      return this._metadata;
+      return this.metadata;
     }
 
     @Override
     public RecordMetadata get(long timeout, TimeUnit unit) throws ExecutionException {
-      return this._metadata;
+      return this.metadata;
     }
 
     @Override
@@ -282,21 +281,21 @@ public class MockKafkaProducer implements Producer<byte[], byte[]> {
   }
 
   private class FlushRunnable implements Runnable {
-    private final int _sleepTime;
+    private final int sleepTime;
 
     public FlushRunnable(int sleepTime) {
-      _sleepTime = sleepTime;
+      this.sleepTime = sleepTime;
     }
 
     public void run() {
-      FutureTask[] callbackArray = new FutureTask[_callbacksList.size()];
-      AtomicReferenceArray<FutureTask> _bufferList =
-          new AtomicReferenceArray<FutureTask>(_callbacksList.toArray(callbackArray));
+      FutureTask[] callbackArray = new FutureTask[callbacksList.size()];
+      AtomicReferenceArray<FutureTask> bufferList =
+          new AtomicReferenceArray<FutureTask>(callbacksList.toArray(callbackArray));
       ExecutorService executor = Executors.newFixedThreadPool(10);
       try {
-        for (int i = 0; i < _bufferList.length(); i++) {
-          Thread.sleep(_sleepTime);
-          FutureTask f = _bufferList.get(i);
+        for (int i = 0; i < bufferList.length(); i++) {
+          Thread.sleep(sleepTime);
+          FutureTask f = bufferList.get(i);
           if (!f.isDone()) {
             executor.submit(f).get();
           }
diff --git a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaCheckpointManagerFactory.java b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaCheckpointManagerFactory.java
index 1846ea8..2494fcc 100644
--- a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaCheckpointManagerFactory.java
+++ b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaCheckpointManagerFactory.java
@@ -19,7 +19,6 @@
 
 package org.apache.samza.system.kafka;
 
-import org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory;
 import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.KafkaConfig;
 import org.apache.samza.config.MapConfig;
diff --git a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaStreamSpec.java b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaStreamSpec.java
index 14d2fe6..8a48283 100644
--- a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaStreamSpec.java
+++ b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaStreamSpec.java
@@ -22,19 +22,19 @@ import com.google.common.collect.ImmutableMap;
 import java.util.Map;
 import java.util.Properties;
 import org.apache.samza.system.StreamSpec;
-import org.apache.samza.util.TestStreamUtil;
 import org.junit.Test;
 
 import static org.junit.Assert.*;
 
 /**
- * See also the general StreamSpec tests in {@link TestStreamUtil}
+ * See also the general StreamSpec tests in {@link org.apache.samza.util.TestStreamUtil}
  */
 public class TestKafkaStreamSpec {
 
   @Test
   public void testUnsupportedConfigStrippedFromProperties() {
-    StreamSpec original = new StreamSpec("dummyId","dummyPhysicalName", "dummySystemName", ImmutableMap.of("segment.bytes", "4", "replication.factor", "7"));
+    StreamSpec original = new StreamSpec("dummyId", "dummyPhysicalName", "dummySystemName",
+        ImmutableMap.of("segment.bytes", "4", "replication.factor", "7"));
 
     // First verify the original
     assertEquals("7", original.get("replication.factor"));
@@ -61,6 +61,6 @@ public class TestKafkaStreamSpec {
 
   @Test(expected = IllegalArgumentException.class)
   public void testInvalidPartitionCount() {
-    new KafkaStreamSpec("dummyId","dummyPhysicalName", "dummySystemName", 0);
+    new KafkaStreamSpec("dummyId", "dummyPhysicalName", "dummySystemName", 0);
   }
 }
diff --git a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
index 82d635f..cb8f34d 100644
--- a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
+++ b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
@@ -22,12 +22,10 @@ package org.apache.samza.system.kafka;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
-import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import org.apache.commons.lang3.RandomStringUtils;
-import org.apache.kafka.clients.admin.DescribeConfigsResult;
 import org.apache.kafka.clients.admin.DescribeTopicsResult;
 import org.apache.kafka.clients.admin.TopicDescription;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -93,7 +91,7 @@ public class TestKafkaSystemAdminJava extends TestKafkaSystemAdmin {
 
     configResourceConfigMap.values().forEach(configEntry -> {
       configEntry.entries().forEach(config -> {
-          kafkaTopicConfig.put(config.name(), config.value());
+        kafkaTopicConfig.put(config.name(), config.value());
       });
     });
 
@@ -229,7 +227,7 @@ public class TestKafkaSystemAdminJava extends TestKafkaSystemAdmin {
 
   @Test
   public void testCreateCoordinatorStreamWithSpecialCharsInTopicName() {
-    final String STREAM = "test.coordinator_test.Stream";
+    final String stream = "test.coordinator_test.Stream";
 
     Map<String, String> map = new HashMap<>();
     map.put("job.coordinator.segment.bytes", "123");
@@ -239,14 +237,14 @@ public class TestKafkaSystemAdminJava extends TestKafkaSystemAdmin {
         String.valueOf(coordReplicatonFactor));
 
     KafkaSystemAdmin admin = Mockito.spy(createSystemAdmin(SYSTEM, map));
-    StreamSpec spec = StreamSpec.createCoordinatorStreamSpec(STREAM, SYSTEM);
+    StreamSpec spec = StreamSpec.createCoordinatorStreamSpec(stream, SYSTEM);
 
     Mockito.doAnswer(invocationOnMock -> {
       StreamSpec internalSpec = (StreamSpec) invocationOnMock.callRealMethod();
       assertTrue(internalSpec instanceof KafkaStreamSpec);  // KafkaStreamSpec is used to carry replication factor
       assertTrue(internalSpec.isCoordinatorStream());
       assertEquals(SYSTEM, internalSpec.getSystemName());
-      assertEquals(STREAM, internalSpec.getPhysicalName());
+      assertEquals(stream, internalSpec.getPhysicalName());
       assertEquals(1, internalSpec.getPartitionCount());
       Assert.assertEquals(coordReplicatonFactor, ((KafkaStreamSpec) internalSpec).getReplicationFactor());
       Assert.assertEquals("123", ((KafkaStreamSpec) internalSpec).getProperties().getProperty("segment.bytes"));
@@ -272,16 +270,16 @@ public class TestKafkaSystemAdminJava extends TestKafkaSystemAdmin {
   }
 
   public void testCreateChangelogStreamHelp(final String topic) {
-    final int PARTITIONS = 12;
-    final int REP_FACTOR = 2;
+    final int partitions = 12;
+    final int repFactor = 2;
 
     Map<String, String> map = new HashMap<>();
     map.put(JobConfig.JOB_DEFAULT_SYSTEM, SYSTEM);
     map.put(String.format("stores.%s.changelog", "fakeStore"), topic);
-    map.put(String.format("stores.%s.changelog.replication.factor", "fakeStore"), String.valueOf(REP_FACTOR));
+    map.put(String.format("stores.%s.changelog.replication.factor", "fakeStore"), String.valueOf(repFactor));
     map.put(String.format("stores.%s.changelog.kafka.segment.bytes", "fakeStore"), "139");
     KafkaSystemAdmin admin = Mockito.spy(createSystemAdmin(SYSTEM, map));
-    StreamSpec spec = StreamSpec.createChangeLogStreamSpec(topic, SYSTEM, PARTITIONS);
+    StreamSpec spec = StreamSpec.createChangeLogStreamSpec(topic, SYSTEM, partitions);
 
     Mockito.doAnswer(invocationOnMock -> {
       StreamSpec internalSpec = (StreamSpec) invocationOnMock.callRealMethod();
@@ -289,8 +287,8 @@ public class TestKafkaSystemAdminJava extends TestKafkaSystemAdmin {
       assertTrue(internalSpec.isChangeLogStream());
       assertEquals(SYSTEM, internalSpec.getSystemName());
       assertEquals(topic, internalSpec.getPhysicalName());
-      assertEquals(REP_FACTOR, ((KafkaStreamSpec) internalSpec).getReplicationFactor());
-      assertEquals(PARTITIONS, internalSpec.getPartitionCount());
+      assertEquals(repFactor, ((KafkaStreamSpec) internalSpec).getReplicationFactor());
+      assertEquals(partitions, internalSpec.getPartitionCount());
       assertEquals("139", ((KafkaStreamSpec) internalSpec).getProperties().getProperty("segment.bytes"));
       assertEquals("compact", ((KafkaStreamSpec) internalSpec).getProperties().getProperty("cleanup.policy"));
 
@@ -366,7 +364,7 @@ public class TestKafkaSystemAdminJava extends TestKafkaSystemAdmin {
   }
 
   @Test
-  public void testShouldAssembleMetadata () {
+  public void testShouldAssembleMetadata() {
     Map<SystemStreamPartition, String> oldestOffsets = new ImmutableMap.Builder<SystemStreamPartition, String>()
         .put(new SystemStreamPartition(SYSTEM, "stream1", new Partition(0)), "o1")
         .put(new SystemStreamPartition(SYSTEM, "stream2", new Partition(0)), "o2")
diff --git a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminWithMock.java b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminWithMock.java
index 25cab8c..cd1e707 100644
--- a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminWithMock.java
+++ b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminWithMock.java
@@ -336,7 +336,7 @@ public class TestKafkaSystemAdminWithMock {
   }
 
   @Test(expected = SamzaException.class)
-  public void testGetSSPMetadataShouldTerminateAfterFiniteRetriesOnException() throws Exception{
+  public void testGetSSPMetadataShouldTerminateAfterFiniteRetriesOnException() throws Exception {
     SystemStreamPartition oneSSP = new SystemStreamPartition(TEST_SYSTEM, VALID_TOPIC, new Partition(0));
     SystemStreamPartition otherSSP = new SystemStreamPartition(TEST_SYSTEM, "otherTopic", new Partition(1));
 
diff --git a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java
index f5b1e8e..dd20248 100644
--- a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java
+++ b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java
@@ -1,5 +1,4 @@
 /*
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -16,9 +15,7 @@
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
- *
  */
-
 package org.apache.samza.system.kafka;
 
 import java.util.HashMap;
@@ -173,7 +170,7 @@ public class TestKafkaSystemConsumer {
     int partitionsNum = 2;
     int ime0Size = Integer.valueOf(FETCH_THRESHOLD_MSGS) / partitionsNum; // fake size, upto the limit
     int ime1Size = Integer.valueOf(FETCH_THRESHOLD_MSGS) / partitionsNum - 100; // fake size, below the limit
-    int ime11Size = 20;// event with the second message still below the size limit
+    int ime11Size = 20; // event with the second message still below the size limit
     ByteArraySerializer bytesSerde = new ByteArraySerializer();
     IncomingMessageEnvelope ime0 = new IncomingMessageEnvelope(ssp0, "0", bytesSerde.serialize("", "key0".getBytes()),
         bytesSerde.serialize("", "value0".getBytes()), ime0Size);
diff --git a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumerMetrics.java b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumerMetrics.java
index 03b0564..5cc6f84 100644
--- a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumerMetrics.java
+++ b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumerMetrics.java
@@ -1,5 +1,4 @@
 /*
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -16,9 +15,7 @@
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
- *
  */
-
 package org.apache.samza.system.kafka;
 
 import java.util.HashMap;
@@ -96,7 +93,7 @@ public class TestKafkaSystemConsumerMetrics {
 
   protected static void validate(Map<String, Metric> metricMap, Map<String, String> expectedValues) {
     // match the expected value, set in the test above, and the value in the metrics
-    for(Map.Entry<String, String> e: expectedValues.entrySet()) {
+    for (Map.Entry<String, String> e : expectedValues.entrySet()) {
       String metricName = e.getKey();
       String expectedValue = e.getValue();
       // get the metric from the registry
diff --git a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemProducerJava.java b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemProducerJava.java
index 7fc450d..73673d3 100644
--- a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemProducerJava.java
+++ b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemProducerJava.java
@@ -27,7 +27,6 @@ import org.apache.samza.util.ExponentialSleepStrategy;
 import org.junit.Test;
 import scala.runtime.AbstractFunction0;
 
-import static junit.framework.Assert.assertEquals;
 import static junit.framework.Assert.assertTrue;
 
 
@@ -38,19 +37,19 @@ public class TestKafkaSystemProducerJava {
   @Test
   public void testInstantiateProducer() {
     KafkaSystemProducer ksp = new KafkaSystemProducer("SysName", new ExponentialSleepStrategy(2.0, 200, 10000),
-        new AbstractFunction0<Producer<byte[], byte[]>>() {
-          @Override
-          public Producer<byte[], byte[]> apply() {
-            return new KafkaProducer<>(new HashMap<String, Object>());
-          }
-        }, new KafkaSystemProducerMetrics("SysName", new MetricsRegistryMap()), new AbstractFunction0<Object>() {
-      @Override
-      public Object apply() {
-        return System.currentTimeMillis();
-      }
-    }, false);
+      new AbstractFunction0<Producer<byte[], byte[]>>() {
+        @Override
+        public Producer<byte[], byte[]> apply() {
+          return new KafkaProducer<>(new HashMap<String, Object>());
+        }
+      }, new KafkaSystemProducerMetrics("SysName", new MetricsRegistryMap()), new AbstractFunction0<Object>() {
+        @Override
+        public Object apply() {
+          return System.currentTimeMillis();
+        }
+      }, false);
 
     long now = System.currentTimeMillis();
-    assertTrue((Long)ksp.clock().apply() >= now);
+    assertTrue((Long) ksp.clock().apply() >= now);
   }
 }
diff --git a/samza-kafka/src/test/java/org/apache/samza/system/kafka/descriptors/TestKafkaInputDescriptor.java b/samza-kafka/src/test/java/org/apache/samza/system/kafka/descriptors/TestKafkaInputDescriptor.java
index 5bce72d..3992e90 100644
--- a/samza-kafka/src/test/java/org/apache/samza/system/kafka/descriptors/TestKafkaInputDescriptor.java
+++ b/samza-kafka/src/test/java/org/apache/samza/system/kafka/descriptors/TestKafkaInputDescriptor.java
@@ -25,12 +25,9 @@ import org.apache.samza.operators.KV;
 import org.apache.samza.serializers.IntegerSerde;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.StringSerde;
-import org.apache.samza.system.kafka.descriptors.KafkaInputDescriptor;
-import org.apache.samza.system.kafka.descriptors.KafkaSystemDescriptor;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
 
 public class TestKafkaInputDescriptor {
   @Test
@@ -42,7 +39,7 @@ public class TestKafkaInputDescriptor {
             .withConsumerAutoOffsetReset("largest")
             .withConsumerFetchMessageMaxBytes(1024 * 1024);
 
-    Map<String, String> generatedConfigs = isd.toConfig();;
+    Map<String, String> generatedConfigs = isd.toConfig();
     assertEquals("kafka", generatedConfigs.get("streams.input-stream.samza.system"));
     assertEquals("largest", generatedConfigs.get("systems.kafka.streams.input-stream.consumer.auto.offset.reset"));
     assertEquals("1048576", generatedConfigs.get("systems.kafka.streams.input-stream.consumer.fetch.message.max.bytes"));
diff --git a/samza-kafka/src/test/java/org/apache/samza/system/kafka/descriptors/TestKafkaSystemDescriptor.java b/samza-kafka/src/test/java/org/apache/samza/system/kafka/descriptors/TestKafkaSystemDescriptor.java
index 31469f8..2a31198 100644
--- a/samza-kafka/src/test/java/org/apache/samza/system/kafka/descriptors/TestKafkaSystemDescriptor.java
+++ b/samza-kafka/src/test/java/org/apache/samza/system/kafka/descriptors/TestKafkaSystemDescriptor.java
@@ -23,7 +23,6 @@ import com.google.common.collect.ImmutableMap;
 
 import java.util.Map;
 import org.apache.samza.system.SystemStreamMetadata;
-import org.apache.samza.system.kafka.descriptors.KafkaSystemDescriptor;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
@@ -37,7 +36,7 @@ public class TestKafkaSystemDescriptor {
             .withProducerBootstrapServers(ImmutableList.of("localhost:567", "localhost:890"))
             .withDefaultStreamOffsetDefault(SystemStreamMetadata.OffsetType.OLDEST)
             .withConsumerAutoOffsetReset("smallest")
-            .withConsumerFetchMessageMaxBytes(1024*1024)
+            .withConsumerFetchMessageMaxBytes(1024 * 1024)
             .withSamzaFetchThreshold(10000)
             .withSamzaFetchThresholdBytes(1024 * 1024)
             .withConsumerConfigs(ImmutableMap.of("custom-consumer-config-key", "custom-consumer-config-value"))