You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2019/03/20 19:52:10 UTC

[samza] branch master updated: SAMZA-2105: [ELASTICSEARCH, HDFS, KAFKA] code cleanup and refactoring

This is an automated email from the ASF dual-hosted git repository.

jagadish pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 380c976  SAMZA-2105: [ELASTICSEARCH, HDFS, KAFKA] code cleanup and refactoring
380c976 is described below

commit 380c9762e683d2c755b7d85f484e338d846132a7
Author: strkkk <an...@gmail.com>
AuthorDate: Wed Mar 20 12:52:01 2019 -0700

    SAMZA-2105: [ELASTICSEARCH, HDFS, KAFKA] code cleanup and refactoring
    
    1. Added/removed modifiers
    2. Guava optional -> java optional and removed guava from samza-elasticsearch module dependencies. Guava optional was added in https://issues.apache.org/jira/browse/SAMZA-853 but it is not clear why guava is better than default API.
    3. Few code snippets are simplified
    
    Author: strkkk <an...@gmail.com>
    
    Reviewers: Jagadish <ja...@apache.org>
    
    Closes #921 from strkkk/hdfs_es_kfka
---
 build.gradle                                       |  1 -
 .../apache/samza/config/ElasticsearchConfig.java   | 22 ++++-------
 .../system/elasticsearch/BulkProcessorFactory.java | 16 ++++----
 .../elasticsearch/ElasticsearchSystemProducer.java |  2 +-
 .../client/TransportClientFactory.java             | 17 ++-------
 .../indexrequest/DefaultIndexRequestFactory.java   | 41 +++++---------------
 .../apache/samza/system/hdfs/HdfsSystemAdmin.java  |  8 ++--
 .../samza/system/hdfs/HdfsSystemConsumer.java      | 15 +++-----
 .../hdfs/descriptors/HdfsSystemDescriptor.java     | 44 +++++++++++-----------
 .../hdfs/partitioner/DirectoryPartitioner.java     | 25 ++++--------
 .../system/hdfs/partitioner/FileSystemAdapter.java |  8 ++--
 .../system/hdfs/reader/MultiFileHdfsReader.java    |  4 +-
 .../system/hdfs/reader/SingleFileHdfsReader.java   | 12 +++---
 .../apache/samza/config/KafkaConsumerConfig.java   | 10 ++---
 .../apache/samza/system/kafka/KafkaStreamSpec.java |  8 ++--
 .../samza/system/kafka/KafkaSystemAdmin.java       | 18 +++------
 .../kafka/descriptors/KafkaInputDescriptor.java    |  4 +-
 .../kafka/descriptors/KafkaSystemDescriptor.java   |  4 +-
 .../samza/system/kafka/KafkaConsumerProxy.java     | 20 ++++------
 19 files changed, 104 insertions(+), 175 deletions(-)

diff --git a/build.gradle b/build.gradle
index 29fba39..f5d9f51 100644
--- a/build.gradle
+++ b/build.gradle
@@ -306,7 +306,6 @@ project(":samza-elasticsearch_$scalaSuffix") {
     compile project(':samza-api')
     compile project(":samza-core_$scalaSuffix")
     compile "org.elasticsearch:elasticsearch:$elasticsearchVersion"
-    compile "com.google.guava:guava:$guavaVersion"
     compile "org.slf4j:slf4j-api:$slf4jVersion"
     testCompile "junit:junit:$junitVersion"
     testCompile "org.mockito:mockito-core:$mockitoVersion"
diff --git a/samza-elasticsearch/src/main/java/org/apache/samza/config/ElasticsearchConfig.java b/samza-elasticsearch/src/main/java/org/apache/samza/config/ElasticsearchConfig.java
index 75bf4c7..b062e24 100644
--- a/samza-elasticsearch/src/main/java/org/apache/samza/config/ElasticsearchConfig.java
+++ b/samza-elasticsearch/src/main/java/org/apache/samza/config/ElasticsearchConfig.java
@@ -20,7 +20,7 @@
 package org.apache.samza.config;
 
 import org.apache.samza.SamzaException;
-import com.google.common.base.Optional;
+import java.util.Optional;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -67,27 +67,19 @@ public class ElasticsearchConfig extends MapConfig {
 
   // Index Request
   public Optional<String> getIndexRequestFactoryClassName() {
-    if (containsKey(CONFIG_KEY_INDEX_REQUEST_FACTORY)) {
-      return Optional.of(get(CONFIG_KEY_INDEX_REQUEST_FACTORY));
-    } else {
-      return Optional.absent();
-    }
+      return Optional.ofNullable(get(CONFIG_KEY_INDEX_REQUEST_FACTORY));
   }
 
   // Transport client settings
   public Optional<String> getTransportHost() {
-    if (containsKey(CONFIG_KEY_CLIENT_TRANSPORT_HOST)) {
-      return Optional.of(get(CONFIG_KEY_CLIENT_TRANSPORT_HOST));
-    } else {
-      return Optional.absent();
-    }
+      return Optional.ofNullable(get(CONFIG_KEY_CLIENT_TRANSPORT_HOST));
   }
 
   public Optional<Integer> getTransportPort() {
     if (containsKey(CONFIG_KEY_CLIENT_TRANSPORT_PORT)) {
       return Optional.of(getInt(CONFIG_KEY_CLIENT_TRANSPORT_PORT));
     } else {
-      return Optional.absent();
+      return Optional.empty();
     }
   }
 
@@ -96,7 +88,7 @@ public class ElasticsearchConfig extends MapConfig {
     if (containsKey(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS)) {
       return Optional.of(getInt(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS));
     } else {
-      return Optional.absent();
+      return Optional.empty();
     }
   }
 
@@ -104,7 +96,7 @@ public class ElasticsearchConfig extends MapConfig {
     if (containsKey(CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB)) {
       return Optional.of(getInt(CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB));
     } else {
-      return Optional.absent();
+      return Optional.empty();
     }
   }
 
@@ -112,7 +104,7 @@ public class ElasticsearchConfig extends MapConfig {
     if (containsKey(CONFIG_KEY_BULK_FLUSH_INTERVALS_MS)) {
       return Optional.of(getInt(CONFIG_KEY_BULK_FLUSH_INTERVALS_MS));
     } else {
-      return Optional.absent();
+      return Optional.empty();
     }
   }
 
diff --git a/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/BulkProcessorFactory.java b/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/BulkProcessorFactory.java
index 0027531..2ceb899 100644
--- a/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/BulkProcessorFactory.java
+++ b/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/BulkProcessorFactory.java
@@ -43,15 +43,13 @@ public class BulkProcessorFactory {
     // This also means BulkProcessor#flush() is blocking as is also required.
     builder.setConcurrentRequests(0);
 
-    if (config.getBulkFlushMaxActions().isPresent()) {
-      builder.setBulkActions(config.getBulkFlushMaxActions().get());
-    }
-    if (config.getBulkFlushMaxSizeMB().isPresent()) {
-      builder.setBulkSize(new ByteSizeValue(config.getBulkFlushMaxSizeMB().get(), ByteSizeUnit.MB));
-    }
-    if (config.getBulkFlushIntervalMS().isPresent()) {
-      builder.setFlushInterval(TimeValue.timeValueMillis(config.getBulkFlushIntervalMS().get()));
-    }
+    config.getBulkFlushMaxActions().ifPresent(builder::setBulkActions);
+    config.getBulkFlushMaxSizeMB().ifPresent(size ->
+      builder.setBulkSize(new ByteSizeValue(size, ByteSizeUnit.MB))
+    );
+    config.getBulkFlushIntervalMS().ifPresent(interval ->
+      builder.setFlushInterval(TimeValue.timeValueMillis(interval))
+    );
 
     return builder.build();
   }
diff --git a/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java b/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java
index 7672ee8..001dabf 100644
--- a/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java
+++ b/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java
@@ -67,7 +67,7 @@ public class ElasticsearchSystemProducer implements SystemProducer {
   private final BulkProcessorFactory bulkProcessorFactory;
   private final ElasticsearchSystemProducerMetrics metrics;
 
-  private Client client;
+  private final Client client;
 
   public ElasticsearchSystemProducer(String system, BulkProcessorFactory bulkProcessorFactory,
                                      Client client, IndexRequestFactory indexRequestFactory,
diff --git a/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/client/TransportClientFactory.java b/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/client/TransportClientFactory.java
index e336ad9..5920b8f 100644
--- a/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/client/TransportClientFactory.java
+++ b/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/client/TransportClientFactory.java
@@ -47,20 +47,11 @@ public class TransportClientFactory implements ClientFactory {
 
   public TransportClientFactory(ElasticsearchConfig config) {
     clientSettings = config.getElasticseachSettings();
+    transportHost = config.getTransportHost().orElseThrow(() ->
+      new SamzaException("You must specify the transport host for TransportClientFactory with the Elasticsearch system."));
 
-    if (config.getTransportHost().isPresent()) {
-      transportHost = config.getTransportHost().get();
-    } else {
-      throw new SamzaException("You must specify the transport host for TransportClientFactory"
-                               + "with the Elasticsearch system.");
-    }
-
-    if (config.getTransportPort().isPresent()) {
-      transportPort = config.getTransportPort().get();
-    } else {
-      throw new SamzaException("You must specify the transport port for TransportClientFactory"
-                               + "with the Elasticsearch system.");
-    }
+    transportPort = config.getTransportPort().orElseThrow(() ->
+      new SamzaException("You must specify the transport port for TransportClientFactory with the Elasticsearch system."));
   }
 
   @Override
diff --git a/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/indexrequest/DefaultIndexRequestFactory.java b/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/indexrequest/DefaultIndexRequestFactory.java
index 7f1e884..7befb3f 100644
--- a/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/indexrequest/DefaultIndexRequestFactory.java
+++ b/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/indexrequest/DefaultIndexRequestFactory.java
@@ -23,10 +23,10 @@ import org.apache.samza.SamzaException;
 import org.apache.samza.system.OutgoingMessageEnvelope;
 import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.client.Requests;
-import com.google.common.base.Optional;
 import org.elasticsearch.index.VersionType;
 
 import java.util.Map;
+import java.util.Optional;
 
 /**
  * The default {@link IndexRequestFactory}.
@@ -58,25 +58,10 @@ public class DefaultIndexRequestFactory implements IndexRequestFactory {
   public IndexRequest getIndexRequest(OutgoingMessageEnvelope envelope) {
     IndexRequest indexRequest = getRequest(envelope);
 
-    Optional<String> id = getId(envelope);
-    if (id.isPresent()) {
-      indexRequest.id(id.get());
-    }
-
-    Optional<String> routingKey = getRoutingKey(envelope);
-    if (routingKey.isPresent()) {
-      indexRequest.routing(routingKey.get());
-    }
-
-    Optional<Long> version = getVersion(envelope);
-    if (version.isPresent()) {
-      indexRequest.version(version.get());
-    }
-
-    Optional<VersionType> versionType = getVersionType(envelope);
-    if (versionType.isPresent()) {
-      indexRequest.versionType(versionType.get());
-    }
+    getId(envelope).ifPresent(indexRequest::id);
+    getRoutingKey(envelope).ifPresent(indexRequest::routing);
+    getVersion(envelope).ifPresent(indexRequest::version);
+    getVersionType(envelope).ifPresent(indexRequest::versionType);
 
     setSource(envelope, indexRequest);
 
@@ -94,27 +79,19 @@ public class DefaultIndexRequestFactory implements IndexRequestFactory {
   }
 
   protected Optional<String> getId(OutgoingMessageEnvelope envelope) {
-    Object id = envelope.getKey();
-    if (id == null) {
-      return Optional.absent();
-    }
-    return Optional.of(id.toString());
+    return Optional.ofNullable(envelope.getKey()).map(Object::toString);
   }
 
   protected Optional<String> getRoutingKey(OutgoingMessageEnvelope envelope) {
-    Object partitionKey = envelope.getPartitionKey();
-    if (partitionKey == null) {
-      return Optional.absent();
-    }
-    return Optional.of(partitionKey.toString());
+    return Optional.ofNullable(envelope.getPartitionKey()).map(Object::toString);
   }
 
   protected Optional<Long> getVersion(OutgoingMessageEnvelope envelope) {
-    return Optional.absent();
+    return Optional.empty();
   }
 
   protected Optional<VersionType> getVersionType(OutgoingMessageEnvelope envelope) {
-    return Optional.absent();
+    return Optional.empty();
   }
 
   protected void setSource(OutgoingMessageEnvelope envelope, IndexRequest indexRequest) {
diff --git a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemAdmin.java b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemAdmin.java
index 0d50f26..7ffbfc7 100644
--- a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemAdmin.java
+++ b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemAdmin.java
@@ -91,10 +91,10 @@ import org.slf4j.LoggerFactory;
 public class HdfsSystemAdmin implements SystemAdmin {
   private static final Logger LOG = LoggerFactory.getLogger(HdfsSystemAdmin.class);
 
-  private HdfsConfig hdfsConfig;
-  private DirectoryPartitioner directoryPartitioner;
-  private String stagingDirectory; // directory that contains the partition description
-  private HdfsReaderFactory.ReaderType readerType;
+  private final HdfsConfig hdfsConfig;
+  private final DirectoryPartitioner directoryPartitioner;
+  private final String stagingDirectory; // directory that contains the partition description
+  private final HdfsReaderFactory.ReaderType readerType;
 
   public HdfsSystemAdmin(String systemName, Config config) {
     hdfsConfig = new HdfsConfig(config);
diff --git a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java
index 92457ab..1ceb5d6 100644
--- a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java
+++ b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java
@@ -22,7 +22,6 @@ package org.apache.samza.system.hdfs;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
@@ -103,9 +102,9 @@ public class HdfsSystemConsumer extends BlockingEnvelopeMap {
    * (stream2) -> (P0) -> "hdfs://user/samzauser/2/datafile01.avro"
    * ...
    */
-  private LoadingCache<String, Map<Partition, List<String>>> cachedPartitionDescriptorMap;
-  private Map<SystemStreamPartition, MultiFileHdfsReader> readers;
-  private Map<SystemStreamPartition, Future> readerRunnableStatus;
+  private final LoadingCache<String, Map<Partition, List<String>>> cachedPartitionDescriptorMap;
+  private final Map<SystemStreamPartition, MultiFileHdfsReader> readers;
+  private final Map<SystemStreamPartition, Future> readerRunnableStatus;
 
   /**
    * Whether the {@link org.apache.samza.system.hdfs.HdfsSystemConsumer} is notified
@@ -130,8 +129,7 @@ public class HdfsSystemConsumer extends BlockingEnvelopeMap {
     this.consumerMetrics = consumerMetrics;
     cachedPartitionDescriptorMap = CacheBuilder.newBuilder().build(new CacheLoader<String, Map<Partition, List<String>>>() {
         @Override
-        public Map<Partition, List<String>> load(String streamName)
-          throws Exception {
+        public Map<Partition, List<String>> load(String streamName) {
           Validate.notEmpty(streamName);
           if (StringUtils.isBlank(stagingDirectory)) {
             throw new SamzaException("Staging directory can't be empty. "
@@ -151,8 +149,7 @@ public class HdfsSystemConsumer extends BlockingEnvelopeMap {
   public void start() {
     LOG.info(String.format("HdfsSystemConsumer started with %d readers", readers.size()));
     executorService = Executors.newCachedThreadPool();
-    readers.entrySet().forEach(
-      entry -> readerRunnableStatus.put(entry.getKey(), executorService.submit(new ReaderRunnable(entry.getValue()))));
+    readers.forEach((key, value) -> readerRunnableStatus.put(key, executorService.submit(new ReaderRunnable(value))));
   }
 
   /**
@@ -275,7 +272,7 @@ public class HdfsSystemConsumer extends BlockingEnvelopeMap {
   }
 
   private class ReaderRunnable implements Runnable {
-    public MultiFileHdfsReader reader;
+    public final MultiFileHdfsReader reader;
 
     public ReaderRunnable(MultiFileHdfsReader reader) {
       this.reader = reader;
diff --git a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/descriptors/HdfsSystemDescriptor.java b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/descriptors/HdfsSystemDescriptor.java
index f4d8566..fd63f79 100644
--- a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/descriptors/HdfsSystemDescriptor.java
+++ b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/descriptors/HdfsSystemDescriptor.java
@@ -92,7 +92,7 @@ public class HdfsSystemDescriptor extends SystemDescriptor<HdfsSystemDescriptor>
    * @return this system descriptor
    */
   public HdfsSystemDescriptor withDatePathFormat(String datePathFormat) {
-    this.datePathFormat = Optional.of(StringUtils.stripToNull(datePathFormat));
+    this.datePathFormat = Optional.ofNullable(StringUtils.stripToNull(datePathFormat));
     return this;
   }
 
@@ -102,7 +102,7 @@ public class HdfsSystemDescriptor extends SystemDescriptor<HdfsSystemDescriptor>
    * @return this system descriptor
    */
   public HdfsSystemDescriptor withOutputBaseDir(String outputBaseDir) {
-    this.outputBaseDir = Optional.of(StringUtils.stripToNull(outputBaseDir));
+    this.outputBaseDir = Optional.ofNullable(StringUtils.stripToNull(outputBaseDir));
     return this;
   }
 
@@ -135,7 +135,7 @@ public class HdfsSystemDescriptor extends SystemDescriptor<HdfsSystemDescriptor>
    * @return this system descriptor
    */
   public HdfsSystemDescriptor withWriteCompressionType(String writeCompressionType) {
-    this.writeCompressionType = Optional.of(StringUtils.stripToNull(writeCompressionType));
+    this.writeCompressionType = Optional.ofNullable(StringUtils.stripToNull(writeCompressionType));
     return this;
   }
 
@@ -145,7 +145,7 @@ public class HdfsSystemDescriptor extends SystemDescriptor<HdfsSystemDescriptor>
    * @return this system descriptor
    */
   public HdfsSystemDescriptor withWriterClassName(String writerClassName) {
-    this.writerClass = Optional.of(StringUtils.stripToNull(writerClassName));
+    this.writerClass = Optional.ofNullable(StringUtils.stripToNull(writerClassName));
     return this;
   }
 
@@ -175,7 +175,7 @@ public class HdfsSystemDescriptor extends SystemDescriptor<HdfsSystemDescriptor>
    * @return this system descriptor
    */
   public HdfsSystemDescriptor withConsumerWhiteList(String whiteList) {
-    this.consumerWhiteList = Optional.of(StringUtils.stripToNull(whiteList));
+    this.consumerWhiteList = Optional.ofNullable(StringUtils.stripToNull(whiteList));
     return this;
   }
 
@@ -185,7 +185,7 @@ public class HdfsSystemDescriptor extends SystemDescriptor<HdfsSystemDescriptor>
    * @return this system descriptor
    */
   public HdfsSystemDescriptor withConsumerBlackList(String blackList) {
-    this.consumerBlackList = Optional.of(StringUtils.stripToNull(blackList));
+    this.consumerBlackList = Optional.ofNullable(StringUtils.stripToNull(blackList));
     return this;
   }
 
@@ -195,7 +195,7 @@ public class HdfsSystemDescriptor extends SystemDescriptor<HdfsSystemDescriptor>
    * @return this system descriptor
    */
   public HdfsSystemDescriptor withConsumerGroupPattern(String groupPattern) {
-    this.consumerGroupPattern = Optional.of(StringUtils.stripToNull(groupPattern));
+    this.consumerGroupPattern = Optional.ofNullable(StringUtils.stripToNull(groupPattern));
     return this;
   }
 
@@ -205,7 +205,7 @@ public class HdfsSystemDescriptor extends SystemDescriptor<HdfsSystemDescriptor>
    * @return this system descriptor
    */
   public HdfsSystemDescriptor withReaderType(String readerType) {
-    this.consumerReader = Optional.of(StringUtils.stripToNull(readerType));
+    this.consumerReader = Optional.ofNullable(StringUtils.stripToNull(readerType));
     return this;
   }
 
@@ -216,7 +216,7 @@ public class HdfsSystemDescriptor extends SystemDescriptor<HdfsSystemDescriptor>
    * @return this system descriptor
    */
   public HdfsSystemDescriptor withStagingDirectory(String stagingDirectory) {
-    this.consumerStagingDirectory = Optional.of(StringUtils.stripToNull(stagingDirectory));
+    this.consumerStagingDirectory = Optional.ofNullable(StringUtils.stripToNull(stagingDirectory));
     return this;
   }
 
@@ -225,29 +225,29 @@ public class HdfsSystemDescriptor extends SystemDescriptor<HdfsSystemDescriptor>
     Map<String, String> config = new HashMap<>(super.toConfig());
     String systemName = getSystemName();
 
-    this.datePathFormat.ifPresent(
+    datePathFormat.ifPresent(
         val -> config.put(String.format(HdfsConfig.DATE_PATH_FORMAT_STRING(), systemName), val));
-    this.outputBaseDir.ifPresent(val -> config.put(String.format(HdfsConfig.BASE_OUTPUT_DIR(), systemName), val));
-    this.writeBatchSizeBytes.ifPresent(
+    outputBaseDir.ifPresent(val -> config.put(String.format(HdfsConfig.BASE_OUTPUT_DIR(), systemName), val));
+    writeBatchSizeBytes.ifPresent(
         val -> config.put(String.format(HdfsConfig.WRITE_BATCH_SIZE_BYTES(), systemName), String.valueOf(val)));
-    this.writeBatchSizeRecords.ifPresent(
+    writeBatchSizeRecords.ifPresent(
         val -> config.put(String.format(HdfsConfig.WRITE_BATCH_SIZE_RECORDS(), systemName), String.valueOf(val)));
-    this.writeCompressionType.ifPresent(
+    writeCompressionType.ifPresent(
         val -> config.put(String.format(HdfsConfig.COMPRESSION_TYPE(), systemName), val));
-    this.writerClass.ifPresent(val -> config.put(String.format(HdfsConfig.HDFS_WRITER_CLASS_NAME(), systemName), val));
+    writerClass.ifPresent(val -> config.put(String.format(HdfsConfig.HDFS_WRITER_CLASS_NAME(), systemName), val));
 
-    this.consumerBufferCapacity.ifPresent(
+    consumerBufferCapacity.ifPresent(
         val -> config.put(String.format(HdfsConfig.CONSUMER_BUFFER_CAPACITY(), systemName), String.valueOf(val)));
-    this.consumerMaxRetries.ifPresent(
+    consumerMaxRetries.ifPresent(
         val -> config.put(String.format(HdfsConfig.CONSUMER_NUM_MAX_RETRIES(), systemName), String.valueOf(val)));
-    this.consumerWhiteList.ifPresent(
+    consumerWhiteList.ifPresent(
         val -> config.put(String.format(HdfsConfig.CONSUMER_PARTITIONER_WHITELIST(), systemName), val));
-    this.consumerBlackList.ifPresent(
+    consumerBlackList.ifPresent(
         val -> config.put(String.format(HdfsConfig.CONSUMER_PARTITIONER_BLACKLIST(), systemName), val));
-    this.consumerGroupPattern.ifPresent(
+    consumerGroupPattern.ifPresent(
         val -> config.put(String.format(HdfsConfig.CONSUMER_PARTITIONER_GROUP_PATTERN(), systemName), val));
-    this.consumerReader.ifPresent(val -> config.put(String.format(HdfsConfig.FILE_READER_TYPE(), systemName), val));
-    this.consumerStagingDirectory.ifPresent(
+    consumerReader.ifPresent(val -> config.put(String.format(HdfsConfig.FILE_READER_TYPE(), systemName), val));
+    consumerStagingDirectory.ifPresent(
         val -> config.put(String.format(HdfsConfig.STAGING_DIRECTORY(), systemName), val));
 
     return config;
diff --git a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/DirectoryPartitioner.java b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/DirectoryPartitioner.java
index 0661139..8244504 100644
--- a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/DirectoryPartitioner.java
+++ b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/DirectoryPartitioner.java
@@ -20,11 +20,9 @@
 package org.apache.samza.system.hdfs.partitioner;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -33,7 +31,6 @@ import java.util.regex.Pattern;
 
 import javax.annotation.Nullable;
 
-import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.samza.Partition;
@@ -63,13 +60,13 @@ public class DirectoryPartitioner {
   private static final Logger LOG = LoggerFactory.getLogger(DirectoryPartitioner.class);
   private static final String GROUP_IDENTIFIER = "\\[id]";
 
-  private String whiteListRegex;
-  private String blackListRegex;
-  private String groupPattern;
-  private FileSystemAdapter fileSystemAdapter;
+  private final String whiteListRegex;
+  private final String blackListRegex;
+  private final String groupPattern;
+  private final FileSystemAdapter fileSystemAdapter;
 
   // stream name => partition => partition descriptor
-  private Map<String, Map<Partition, List<String>>> partitionDescriptorMap = new HashMap<>();
+  private final Map<String, Map<Partition, List<String>>> partitionDescriptorMap = new HashMap<>();
 
   public DirectoryPartitioner(String whiteList, String blackList, String groupPattern,
     FileSystemAdapter fileSystemAdapter) {
@@ -93,7 +90,7 @@ public class DirectoryPartitioner {
     allFiles.stream().filter(file -> file.getPath().matches(whiteListRegex) && !file.getPath().matches(blackListRegex))
       .forEach(filteredFiles::add);
     // sort the files to have a consistent order
-    filteredFiles.sort((f1, f2) -> f1.getPath().compareTo(f2.getPath()));
+    filteredFiles.sort(Comparator.comparing(FileMetadata::getPath));
     LOG.info(String.format("List of filtered files for %s: %s", streamName, filteredFiles));
     return filteredFiles;
   }
@@ -152,7 +149,7 @@ public class DirectoryPartitioner {
     List<List<FileMetadata>> ret = new ArrayList<>();
     // sort the map to guarantee consistent ordering
     List<String> sortedKeys = new ArrayList<>(map.keySet());
-    sortedKeys.sort(Comparator.<String>naturalOrder());
+    sortedKeys.sort(Comparator.naturalOrder());
     sortedKeys.stream().forEach(key -> ret.add(map.get(key)));
     return ret;
   }
@@ -175,13 +172,7 @@ public class DirectoryPartitioner {
       throw new SamzaException("The list of new files is not a super set of the old files. diff = "
         + oldFileSet.removeAll(newFileSet));
     }
-    Iterator<FileMetadata> iterator = newFileList.iterator();
-    while (iterator.hasNext()) {
-      FileMetadata file = iterator.next();
-      if (!oldFileSet.contains(file.getPath())) {
-        iterator.remove();
-      }
-    }
+    newFileList.removeIf(file -> !oldFileSet.contains(file.getPath()));
     return newFileList;
   }
 
diff --git a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/FileSystemAdapter.java b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/FileSystemAdapter.java
index 5fec4bf..7e35eec 100644
--- a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/FileSystemAdapter.java
+++ b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/FileSystemAdapter.java
@@ -33,11 +33,11 @@ public interface FileSystemAdapter {
    * @param streamName name of the stream
    * @return list of <code>FileMetadata</code> for all files associated to the given stream
    */
-  public List<FileMetadata> getAllFiles(String streamName);
+  List<FileMetadata> getAllFiles(String streamName);
 
-  public class FileMetadata {
-    private String path;
-    private long length;
+  class FileMetadata {
+    private final String path;
+    private final long length;
 
     public FileMetadata(String path, long length) {
       this.path = path;
diff --git a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/MultiFileHdfsReader.java b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/MultiFileHdfsReader.java
index eea68bb..cd8cc5e 100644
--- a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/MultiFileHdfsReader.java
+++ b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/MultiFileHdfsReader.java
@@ -46,7 +46,7 @@ public class MultiFileHdfsReader {
 
   private final HdfsReaderFactory.ReaderType readerType;
   private final SystemStreamPartition systemStreamPartition;
-  private List<String> filePaths;
+  private final List<String> filePaths;
   private SingleFileHdfsReader curReader;
   private int curFileIndex = 0;
   private String curSingleFileOffset;
@@ -127,7 +127,7 @@ public class MultiFileHdfsReader {
     this.filePaths = partitionDescriptors;
     this.numMaxRetries = numMaxRetries;
     this.numRetries = 0;
-    if (partitionDescriptors.size() <= 0) {
+    if (partitionDescriptors.isEmpty()) {
       throw new SamzaException(
         "Invalid number of files based on partition descriptors: " + partitionDescriptors.size());
     }
diff --git a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/SingleFileHdfsReader.java b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/SingleFileHdfsReader.java
index eb8a70d..cbae032 100644
--- a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/SingleFileHdfsReader.java
+++ b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/SingleFileHdfsReader.java
@@ -28,35 +28,35 @@ public interface SingleFileHdfsReader {
    * @param path path of the file to be read
    * @param offset offset the reader should start from
    */
-  public void open(String path, String offset);
+  void open(String path, String offset);
 
   /**
    * Seek to a specific offset
    * @param offset offset the reader should seek to
    */
-  public void seek(String offset);
+  void seek(String offset);
 
   /**
    * Construct and return the next message envelope
    * @return constructed IncomeMessageEnvelope
    */
-  public IncomingMessageEnvelope readNext();
+  IncomingMessageEnvelope readNext();
 
   /**
    * Get the next offset, which is the offset for the next message
    * that will be returned by readNext
    * @return next offset
    */
-  public String nextOffset();
+  String nextOffset();
 
   /**
    * Whether there are still records to be read
    * @return true of false based on whether the reader has hit end of file
    */
-  public boolean hasNext();
+  boolean hasNext();
 
   /**
    * Close the reader.
    */
-  public void close();
+  void close();
 }
diff --git a/samza-kafka/src/main/java/org/apache/samza/config/KafkaConsumerConfig.java b/samza-kafka/src/main/java/org/apache/samza/config/KafkaConsumerConfig.java
index 21709be..dea60b3 100644
--- a/samza-kafka/src/main/java/org/apache/samza/config/KafkaConsumerConfig.java
+++ b/samza-kafka/src/main/java/org/apache/samza/config/KafkaConsumerConfig.java
@@ -71,8 +71,7 @@ public class KafkaConsumerConfig extends HashMap<String, Object> {
 
     final String groupId = createConsumerGroupId(config);
 
-    Map<String, Object> consumerProps = new HashMap<>();
-    consumerProps.putAll(subConf);
+    Map<String, Object> consumerProps = new HashMap<>(subConf);
 
     consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
     consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
@@ -115,8 +114,7 @@ public class KafkaConsumerConfig extends HashMap<String, Object> {
     }
 
     // Override default max poll config if there is no value
-    consumerProps.computeIfAbsent(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,
-        (k) -> DEFAULT_KAFKA_CONSUMER_MAX_POLL_RECORDS);
+    consumerProps.putIfAbsent(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, DEFAULT_KAFKA_CONSUMER_MAX_POLL_RECORDS);
 
     return new KafkaConsumerConfig(consumerProps, systemName);
   }
@@ -194,7 +192,7 @@ public class KafkaConsumerConfig extends HashMap<String, Object> {
         return autoOffsetReset;
       }
       // translate old kafka consumer values into new ones (SAMZA-1987 top remove it)
-      String newAutoOffsetReset = null;
+      String newAutoOffsetReset;
       switch (autoOffsetReset) {
         case "largest":
           newAutoOffsetReset = KAFKA_OFFSET_LATEST;
@@ -230,4 +228,4 @@ public class KafkaConsumerConfig extends HashMap<String, Object> {
 
     return newAutoOffsetReset;
   }
-}
\ No newline at end of file
+}
diff --git a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java
index 113dced..d621308 100644
--- a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java
+++ b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java
@@ -33,7 +33,7 @@ import org.slf4j.LoggerFactory;
  * Extends StreamSpec with the ability to easily get the topic replication factor.
  */
 public class KafkaStreamSpec extends StreamSpec {
-  private static Logger LOG = LoggerFactory.getLogger(KafkaStreamSpec.class);
+  private static final Logger LOG = LoggerFactory.getLogger(KafkaStreamSpec.class);
 
   private static final int DEFAULT_REPLICATION_FACTOR = 2;
 
@@ -59,10 +59,8 @@ public class KafkaStreamSpec extends StreamSpec {
    * @return            The Map instance.
    */
   private static Map<String, String> propertiesToMap(Properties properties) {
-    Map<String, String> map = new HashMap<String, String>();
-    for (final String name: properties.stringPropertyNames()) {
-      map.put(name, properties.getProperty(name));
-    }
+    Map<String, String> map = new HashMap<>();
+    properties.stringPropertyNames().forEach(name -> map.put(name, properties.getProperty(name)));
     return map;
   }
 
diff --git a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
index 36aa695..f4db408 100644
--- a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
+++ b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
@@ -244,9 +244,7 @@ public class KafkaSystemAdmin implements SystemAdmin {
               List<PartitionInfo> partitionInfos = metadataConsumer.partitionsFor(streamName);
               LOG.debug("Stream {} has partitions {}", streamName, partitionInfos);
 
-              partitionInfos.forEach(partitionInfo -> {
-                partitionMetadata.put(new Partition(partitionInfo.partition()), dummySspm);
-              });
+              partitionInfos.forEach(partitionInfo -> partitionMetadata.put(new Partition(partitionInfo.partition()), dummySspm));
 
               allMetadata.put(streamName, new SystemStreamMetadata(streamName, partitionMetadata));
             });
@@ -370,9 +368,7 @@ public class KafkaSystemAdmin implements SystemAdmin {
           }
         };
 
-    Map<String, SystemStreamMetadata> result =
-        retryBackoff.run(fetchMetadataOperation, onExceptionRetryOperation).getOrElse(fallbackOperation);
-    return result;
+    return retryBackoff.run(fetchMetadataOperation, onExceptionRetryOperation).getOrElse(fallbackOperation);
   }
 
   /**
@@ -401,9 +397,7 @@ public class KafkaSystemAdmin implements SystemAdmin {
     Map<TopicPartition, Long> upcomingOffsetsWithLong = metadataConsumer.endOffsets(topicPartitions);
     LOG.debug("Kafka-fetched endOffsets: {}", upcomingOffsetsWithLong);
 
-    oldestOffsetsWithLong.forEach((topicPartition, offset) -> {
-      oldestOffsets.put(toSystemStreamPartition(topicPartition), String.valueOf(offset));
-    });
+    oldestOffsetsWithLong.forEach((topicPartition, offset) -> oldestOffsets.put(toSystemStreamPartition(topicPartition), String.valueOf(offset)));
 
     upcomingOffsetsWithLong.forEach((topicPartition, offset) -> {
       upcomingOffsets.put(toSystemStreamPartition(topicPartition), String.valueOf(offset));
@@ -511,7 +505,7 @@ public class KafkaSystemAdmin implements SystemAdmin {
     NewTopic newTopic = new NewTopic(topicName, kSpec.getPartitionCount(), (short) kSpec.getReplicationFactor());
 
     // specify the configs
-    Map<String, String> streamConfig = new HashMap(streamSpec.getConfig());
+    Map<String, String> streamConfig = new HashMap<>(streamSpec.getConfig());
     // HACK - replication.factor is invalid config for AdminClient.createTopics
     if (streamConfig.containsKey(REPL_FACTOR)) {
       String repl = streamConfig.get(REPL_FACTOR);
@@ -561,7 +555,7 @@ public class KafkaSystemAdmin implements SystemAdmin {
   }
 
   /**
-   * Converts a StreamSpec into a KafakStreamSpec. Special handling for coordinator and changelog stream.
+   * Converts a StreamSpec into a KafkaStreamSpec. Special handling for coordinator and changelog stream.
    * @param spec a StreamSpec object
    * @return KafkaStreamSpec object
    */
@@ -615,7 +609,7 @@ public class KafkaSystemAdmin implements SystemAdmin {
 
   // get partition info for topic
   Map<String, List<PartitionInfo>> getTopicMetadata(Set<String> topics) {
-    Map<String, List<PartitionInfo>> streamToPartitionsInfo = new HashMap();
+    Map<String, List<PartitionInfo>> streamToPartitionsInfo = new HashMap<>();
     List<PartitionInfo> partitionInfoList;
     for (String topic : topics) {
       partitionInfoList = metadataConsumer.partitionsFor(topic);
diff --git a/samza-kafka/src/main/java/org/apache/samza/system/kafka/descriptors/KafkaInputDescriptor.java b/samza-kafka/src/main/java/org/apache/samza/system/kafka/descriptors/KafkaInputDescriptor.java
index d9477e5..54f433f 100644
--- a/samza-kafka/src/main/java/org/apache/samza/system/kafka/descriptors/KafkaInputDescriptor.java
+++ b/samza-kafka/src/main/java/org/apache/samza/system/kafka/descriptors/KafkaInputDescriptor.java
@@ -71,7 +71,7 @@ public class KafkaInputDescriptor<StreamMessageType>
    * @return this input descriptor
    */
   public KafkaInputDescriptor<StreamMessageType> withConsumerAutoOffsetReset(String consumerAutoOffsetReset) {
-    this.consumerAutoOffsetResetOptional = Optional.of(StringUtils.stripToNull(consumerAutoOffsetReset));
+    this.consumerAutoOffsetResetOptional = Optional.ofNullable(StringUtils.stripToNull(consumerAutoOffsetReset));
     return this;
   }
 
@@ -94,7 +94,7 @@ public class KafkaInputDescriptor<StreamMessageType>
 
   @Override
   public Map<String, String> toConfig() {
-    HashMap<String, String> configs = new HashMap<>(super.toConfig());
+    Map<String, String> configs = new HashMap<>(super.toConfig());
     // Note: Kafka configuration needs the topic's physical name, not the stream-id.
     // We won't have that here if user only specified it in configs, or if it got rewritten
     // by the planner to something different than what's in this descriptor.
diff --git a/samza-kafka/src/main/java/org/apache/samza/system/kafka/descriptors/KafkaSystemDescriptor.java b/samza-kafka/src/main/java/org/apache/samza/system/kafka/descriptors/KafkaSystemDescriptor.java
index 091c21a..8c4d48b 100644
--- a/samza-kafka/src/main/java/org/apache/samza/system/kafka/descriptors/KafkaSystemDescriptor.java
+++ b/samza-kafka/src/main/java/org/apache/samza/system/kafka/descriptors/KafkaSystemDescriptor.java
@@ -120,7 +120,7 @@ public class KafkaSystemDescriptor extends SystemDescriptor<KafkaSystemDescripto
    * @return this system descriptor
    */
   public KafkaSystemDescriptor withConsumerAutoOffsetReset(String consumerAutoOffsetReset) {
-    this.consumerAutoOffsetResetOptional = Optional.of(StringUtils.stripToNull(consumerAutoOffsetReset));
+    this.consumerAutoOffsetResetOptional = Optional.ofNullable(StringUtils.stripToNull(consumerAutoOffsetReset));
     return this;
   }
 
@@ -246,4 +246,4 @@ public class KafkaSystemDescriptor extends SystemDescriptor<KafkaSystemDescripto
     producerConfigs.forEach((key, value) -> configs.put(String.format(PRODUCER_CONFIGS_CONFIG_KEY, getSystemName(), key), value));
     return configs;
   }
-}
\ No newline at end of file
+}
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
index 93b1ab2..ac0a55c 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
@@ -57,7 +57,7 @@ public class KafkaConsumerProxy<K, V> {
 
   private static final int SLEEP_MS_WHILE_NO_TOPIC_PARTITION = 100;
 
-  final Thread consumerPollThread;
+  private final Thread consumerPollThread;
   private final Consumer<K, V> kafkaConsumer;
   private final KafkaSystemConsumer.KafkaConsumerMessageSink sink;
   private final KafkaSystemConsumerMetrics kafkaConsumerMetrics;
@@ -187,7 +187,7 @@ public class KafkaConsumerProxy<K, V> {
 
   // creates a separate thread for getting the messages.
   private Runnable createProxyThreadRunnable() {
-    Runnable runnable = () -> {
+    return () -> {
       isRunning = true;
 
       try {
@@ -208,8 +208,6 @@ public class KafkaConsumerProxy<K, V> {
         LOG.info("KafkaConsumerProxy for system {} has stopped.", systemName);
       }
     };
-
-    return runnable;
   }
 
   private void fetchMessages() {
@@ -305,11 +303,7 @@ public class KafkaConsumerProxy<K, V> {
       updateMetrics(record, tp);
 
       SystemStreamPartition ssp = topicPartitionToSSP.get(tp);
-      List<IncomingMessageEnvelope> messages = results.get(ssp);
-      if (messages == null) {
-        messages = new ArrayList<>();
-        results.put(ssp, messages);
-      }
+      List<IncomingMessageEnvelope> messages = results.computeIfAbsent(ssp, k -> new ArrayList<>());
 
       IncomingMessageEnvelope incomingMessageEnvelope = handleNewRecord(record, ssp);
       messages.add(incomingMessageEnvelope);
@@ -359,7 +353,7 @@ public class KafkaConsumerProxy<K, V> {
     if (lag == null) {
       throw new SamzaException("Unknown/unregistered ssp in latestLags. ssp=" + ssp + "; system=" + systemName);
     }
-    long currentSSPLag = lag.longValue(); // lag between the current offset and the highwatermark
+    long currentSSPLag = lag; // lag between the current offset and the highwatermark
     if (currentSSPLag < 0) {
       return;
     }
@@ -397,7 +391,7 @@ public class KafkaConsumerProxy<K, V> {
 
     // populate the MetricNames first time
     if (perPartitionMetrics.isEmpty()) {
-      HashMap<String, String> tags = new HashMap<>();
+      Map<String, String> tags = new HashMap<>();
       tags.put("client-id", clientId); // this is required by the KafkaConsumer to get the metrics
 
       for (SystemStreamPartition ssp : ssps) {
@@ -429,10 +423,10 @@ public class KafkaConsumerProxy<K, V> {
       Long lag = latestLags.get(ssp);
       LOG.trace("Latest offset of {} is  {}; lag = {}", ssp, offset, lag);
       if (lag != null && offset != null && lag >= 0) {
-        long streamEndOffset = offset.longValue() + lag.longValue();
+        long streamEndOffset = offset + lag;
         // update the metrics
         kafkaConsumerMetrics.setHighWatermarkValue(tp, streamEndOffset);
-        kafkaConsumerMetrics.setLagValue(tp, lag.longValue());
+        kafkaConsumerMetrics.setLagValue(tp, lag);
       }
     }
   }