You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2019/03/20 19:52:10 UTC
[samza] branch master updated: SAMZA-2105: [ELASTICSEARCH, HDFS,
KAFKA] code cleanup and refactoring
This is an automated email from the ASF dual-hosted git repository.
jagadish pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 380c976 SAMZA-2105: [ELASTICSEARCH, HDFS, KAFKA] code cleanup and refactoring
380c976 is described below
commit 380c9762e683d2c755b7d85f484e338d846132a7
Author: strkkk <an...@gmail.com>
AuthorDate: Wed Mar 20 12:52:01 2019 -0700
SAMZA-2105: [ELASTICSEARCH, HDFS, KAFKA] code cleanup and refactoring
1. Added/removed modifiers
2. Guava optional -> java optional and removed guava from samza-elasticsearch module dependencies. Guava optional was added in https://issues.apache.org/jira/browse/SAMZA-853 but it is not clear why guava is better than default API.
3. Few code snippets are simplified
Author: strkkk <an...@gmail.com>
Reviewers: Jagadish <ja...@apache.org>
Closes #921 from strkkk/hdfs_es_kfka
---
build.gradle | 1 -
.../apache/samza/config/ElasticsearchConfig.java | 22 ++++-------
.../system/elasticsearch/BulkProcessorFactory.java | 16 ++++----
.../elasticsearch/ElasticsearchSystemProducer.java | 2 +-
.../client/TransportClientFactory.java | 17 ++-------
.../indexrequest/DefaultIndexRequestFactory.java | 41 +++++---------------
.../apache/samza/system/hdfs/HdfsSystemAdmin.java | 8 ++--
.../samza/system/hdfs/HdfsSystemConsumer.java | 15 +++-----
.../hdfs/descriptors/HdfsSystemDescriptor.java | 44 +++++++++++-----------
.../hdfs/partitioner/DirectoryPartitioner.java | 25 ++++--------
.../system/hdfs/partitioner/FileSystemAdapter.java | 8 ++--
.../system/hdfs/reader/MultiFileHdfsReader.java | 4 +-
.../system/hdfs/reader/SingleFileHdfsReader.java | 12 +++---
.../apache/samza/config/KafkaConsumerConfig.java | 10 ++---
.../apache/samza/system/kafka/KafkaStreamSpec.java | 8 ++--
.../samza/system/kafka/KafkaSystemAdmin.java | 18 +++------
.../kafka/descriptors/KafkaInputDescriptor.java | 4 +-
.../kafka/descriptors/KafkaSystemDescriptor.java | 4 +-
.../samza/system/kafka/KafkaConsumerProxy.java | 20 ++++------
19 files changed, 104 insertions(+), 175 deletions(-)
diff --git a/build.gradle b/build.gradle
index 29fba39..f5d9f51 100644
--- a/build.gradle
+++ b/build.gradle
@@ -306,7 +306,6 @@ project(":samza-elasticsearch_$scalaSuffix") {
compile project(':samza-api')
compile project(":samza-core_$scalaSuffix")
compile "org.elasticsearch:elasticsearch:$elasticsearchVersion"
- compile "com.google.guava:guava:$guavaVersion"
compile "org.slf4j:slf4j-api:$slf4jVersion"
testCompile "junit:junit:$junitVersion"
testCompile "org.mockito:mockito-core:$mockitoVersion"
diff --git a/samza-elasticsearch/src/main/java/org/apache/samza/config/ElasticsearchConfig.java b/samza-elasticsearch/src/main/java/org/apache/samza/config/ElasticsearchConfig.java
index 75bf4c7..b062e24 100644
--- a/samza-elasticsearch/src/main/java/org/apache/samza/config/ElasticsearchConfig.java
+++ b/samza-elasticsearch/src/main/java/org/apache/samza/config/ElasticsearchConfig.java
@@ -20,7 +20,7 @@
package org.apache.samza.config;
import org.apache.samza.SamzaException;
-import com.google.common.base.Optional;
+import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -67,27 +67,19 @@ public class ElasticsearchConfig extends MapConfig {
// Index Request
public Optional<String> getIndexRequestFactoryClassName() {
- if (containsKey(CONFIG_KEY_INDEX_REQUEST_FACTORY)) {
- return Optional.of(get(CONFIG_KEY_INDEX_REQUEST_FACTORY));
- } else {
- return Optional.absent();
- }
+ return Optional.ofNullable(get(CONFIG_KEY_INDEX_REQUEST_FACTORY));
}
// Transport client settings
public Optional<String> getTransportHost() {
- if (containsKey(CONFIG_KEY_CLIENT_TRANSPORT_HOST)) {
- return Optional.of(get(CONFIG_KEY_CLIENT_TRANSPORT_HOST));
- } else {
- return Optional.absent();
- }
+ return Optional.ofNullable(get(CONFIG_KEY_CLIENT_TRANSPORT_HOST));
}
public Optional<Integer> getTransportPort() {
if (containsKey(CONFIG_KEY_CLIENT_TRANSPORT_PORT)) {
return Optional.of(getInt(CONFIG_KEY_CLIENT_TRANSPORT_PORT));
} else {
- return Optional.absent();
+ return Optional.empty();
}
}
@@ -96,7 +88,7 @@ public class ElasticsearchConfig extends MapConfig {
if (containsKey(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS)) {
return Optional.of(getInt(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS));
} else {
- return Optional.absent();
+ return Optional.empty();
}
}
@@ -104,7 +96,7 @@ public class ElasticsearchConfig extends MapConfig {
if (containsKey(CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB)) {
return Optional.of(getInt(CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB));
} else {
- return Optional.absent();
+ return Optional.empty();
}
}
@@ -112,7 +104,7 @@ public class ElasticsearchConfig extends MapConfig {
if (containsKey(CONFIG_KEY_BULK_FLUSH_INTERVALS_MS)) {
return Optional.of(getInt(CONFIG_KEY_BULK_FLUSH_INTERVALS_MS));
} else {
- return Optional.absent();
+ return Optional.empty();
}
}
diff --git a/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/BulkProcessorFactory.java b/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/BulkProcessorFactory.java
index 0027531..2ceb899 100644
--- a/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/BulkProcessorFactory.java
+++ b/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/BulkProcessorFactory.java
@@ -43,15 +43,13 @@ public class BulkProcessorFactory {
// This also means BulkProcessor#flush() is blocking as is also required.
builder.setConcurrentRequests(0);
- if (config.getBulkFlushMaxActions().isPresent()) {
- builder.setBulkActions(config.getBulkFlushMaxActions().get());
- }
- if (config.getBulkFlushMaxSizeMB().isPresent()) {
- builder.setBulkSize(new ByteSizeValue(config.getBulkFlushMaxSizeMB().get(), ByteSizeUnit.MB));
- }
- if (config.getBulkFlushIntervalMS().isPresent()) {
- builder.setFlushInterval(TimeValue.timeValueMillis(config.getBulkFlushIntervalMS().get()));
- }
+ config.getBulkFlushMaxActions().ifPresent(builder::setBulkActions);
+ config.getBulkFlushMaxSizeMB().ifPresent(size ->
+ builder.setBulkSize(new ByteSizeValue(size, ByteSizeUnit.MB))
+ );
+ config.getBulkFlushIntervalMS().ifPresent(interval ->
+ builder.setFlushInterval(TimeValue.timeValueMillis(interval))
+ );
return builder.build();
}
diff --git a/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java b/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java
index 7672ee8..001dabf 100644
--- a/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java
+++ b/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java
@@ -67,7 +67,7 @@ public class ElasticsearchSystemProducer implements SystemProducer {
private final BulkProcessorFactory bulkProcessorFactory;
private final ElasticsearchSystemProducerMetrics metrics;
- private Client client;
+ private final Client client;
public ElasticsearchSystemProducer(String system, BulkProcessorFactory bulkProcessorFactory,
Client client, IndexRequestFactory indexRequestFactory,
diff --git a/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/client/TransportClientFactory.java b/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/client/TransportClientFactory.java
index e336ad9..5920b8f 100644
--- a/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/client/TransportClientFactory.java
+++ b/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/client/TransportClientFactory.java
@@ -47,20 +47,11 @@ public class TransportClientFactory implements ClientFactory {
public TransportClientFactory(ElasticsearchConfig config) {
clientSettings = config.getElasticseachSettings();
+ transportHost = config.getTransportHost().orElseThrow(() ->
+ new SamzaException("You must specify the transport host for TransportClientFactory with the Elasticsearch system."));
- if (config.getTransportHost().isPresent()) {
- transportHost = config.getTransportHost().get();
- } else {
- throw new SamzaException("You must specify the transport host for TransportClientFactory"
- + "with the Elasticsearch system.");
- }
-
- if (config.getTransportPort().isPresent()) {
- transportPort = config.getTransportPort().get();
- } else {
- throw new SamzaException("You must specify the transport port for TransportClientFactory"
- + "with the Elasticsearch system.");
- }
+ transportPort = config.getTransportPort().orElseThrow(() ->
+ new SamzaException("You must specify the transport port for TransportClientFactory with the Elasticsearch system."));
}
@Override
diff --git a/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/indexrequest/DefaultIndexRequestFactory.java b/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/indexrequest/DefaultIndexRequestFactory.java
index 7f1e884..7befb3f 100644
--- a/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/indexrequest/DefaultIndexRequestFactory.java
+++ b/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/indexrequest/DefaultIndexRequestFactory.java
@@ -23,10 +23,10 @@ import org.apache.samza.SamzaException;
import org.apache.samza.system.OutgoingMessageEnvelope;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
-import com.google.common.base.Optional;
import org.elasticsearch.index.VersionType;
import java.util.Map;
+import java.util.Optional;
/**
* The default {@link IndexRequestFactory}.
@@ -58,25 +58,10 @@ public class DefaultIndexRequestFactory implements IndexRequestFactory {
public IndexRequest getIndexRequest(OutgoingMessageEnvelope envelope) {
IndexRequest indexRequest = getRequest(envelope);
- Optional<String> id = getId(envelope);
- if (id.isPresent()) {
- indexRequest.id(id.get());
- }
-
- Optional<String> routingKey = getRoutingKey(envelope);
- if (routingKey.isPresent()) {
- indexRequest.routing(routingKey.get());
- }
-
- Optional<Long> version = getVersion(envelope);
- if (version.isPresent()) {
- indexRequest.version(version.get());
- }
-
- Optional<VersionType> versionType = getVersionType(envelope);
- if (versionType.isPresent()) {
- indexRequest.versionType(versionType.get());
- }
+ getId(envelope).ifPresent(indexRequest::id);
+ getRoutingKey(envelope).ifPresent(indexRequest::routing);
+ getVersion(envelope).ifPresent(indexRequest::version);
+ getVersionType(envelope).ifPresent(indexRequest::versionType);
setSource(envelope, indexRequest);
@@ -94,27 +79,19 @@ public class DefaultIndexRequestFactory implements IndexRequestFactory {
}
protected Optional<String> getId(OutgoingMessageEnvelope envelope) {
- Object id = envelope.getKey();
- if (id == null) {
- return Optional.absent();
- }
- return Optional.of(id.toString());
+ return Optional.ofNullable(envelope.getKey()).map(Object::toString);
}
protected Optional<String> getRoutingKey(OutgoingMessageEnvelope envelope) {
- Object partitionKey = envelope.getPartitionKey();
- if (partitionKey == null) {
- return Optional.absent();
- }
- return Optional.of(partitionKey.toString());
+ return Optional.ofNullable(envelope.getPartitionKey()).map(Object::toString);
}
protected Optional<Long> getVersion(OutgoingMessageEnvelope envelope) {
- return Optional.absent();
+ return Optional.empty();
}
protected Optional<VersionType> getVersionType(OutgoingMessageEnvelope envelope) {
- return Optional.absent();
+ return Optional.empty();
}
protected void setSource(OutgoingMessageEnvelope envelope, IndexRequest indexRequest) {
diff --git a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemAdmin.java b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemAdmin.java
index 0d50f26..7ffbfc7 100644
--- a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemAdmin.java
+++ b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemAdmin.java
@@ -91,10 +91,10 @@ import org.slf4j.LoggerFactory;
public class HdfsSystemAdmin implements SystemAdmin {
private static final Logger LOG = LoggerFactory.getLogger(HdfsSystemAdmin.class);
- private HdfsConfig hdfsConfig;
- private DirectoryPartitioner directoryPartitioner;
- private String stagingDirectory; // directory that contains the partition description
- private HdfsReaderFactory.ReaderType readerType;
+ private final HdfsConfig hdfsConfig;
+ private final DirectoryPartitioner directoryPartitioner;
+ private final String stagingDirectory; // directory that contains the partition description
+ private final HdfsReaderFactory.ReaderType readerType;
public HdfsSystemAdmin(String systemName, Config config) {
hdfsConfig = new HdfsConfig(config);
diff --git a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java
index 92457ab..1ceb5d6 100644
--- a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java
+++ b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java
@@ -22,7 +22,6 @@ package org.apache.samza.system.hdfs;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
@@ -103,9 +102,9 @@ public class HdfsSystemConsumer extends BlockingEnvelopeMap {
* (stream2) -> (P0) -> "hdfs://user/samzauser/2/datafile01.avro"
* ...
*/
- private LoadingCache<String, Map<Partition, List<String>>> cachedPartitionDescriptorMap;
- private Map<SystemStreamPartition, MultiFileHdfsReader> readers;
- private Map<SystemStreamPartition, Future> readerRunnableStatus;
+ private final LoadingCache<String, Map<Partition, List<String>>> cachedPartitionDescriptorMap;
+ private final Map<SystemStreamPartition, MultiFileHdfsReader> readers;
+ private final Map<SystemStreamPartition, Future> readerRunnableStatus;
/**
* Whether the {@link org.apache.samza.system.hdfs.HdfsSystemConsumer} is notified
@@ -130,8 +129,7 @@ public class HdfsSystemConsumer extends BlockingEnvelopeMap {
this.consumerMetrics = consumerMetrics;
cachedPartitionDescriptorMap = CacheBuilder.newBuilder().build(new CacheLoader<String, Map<Partition, List<String>>>() {
@Override
- public Map<Partition, List<String>> load(String streamName)
- throws Exception {
+ public Map<Partition, List<String>> load(String streamName) {
Validate.notEmpty(streamName);
if (StringUtils.isBlank(stagingDirectory)) {
throw new SamzaException("Staging directory can't be empty. "
@@ -151,8 +149,7 @@ public class HdfsSystemConsumer extends BlockingEnvelopeMap {
public void start() {
LOG.info(String.format("HdfsSystemConsumer started with %d readers", readers.size()));
executorService = Executors.newCachedThreadPool();
- readers.entrySet().forEach(
- entry -> readerRunnableStatus.put(entry.getKey(), executorService.submit(new ReaderRunnable(entry.getValue()))));
+ readers.forEach((key, value) -> readerRunnableStatus.put(key, executorService.submit(new ReaderRunnable(value))));
}
/**
@@ -275,7 +272,7 @@ public class HdfsSystemConsumer extends BlockingEnvelopeMap {
}
private class ReaderRunnable implements Runnable {
- public MultiFileHdfsReader reader;
+ public final MultiFileHdfsReader reader;
public ReaderRunnable(MultiFileHdfsReader reader) {
this.reader = reader;
diff --git a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/descriptors/HdfsSystemDescriptor.java b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/descriptors/HdfsSystemDescriptor.java
index f4d8566..fd63f79 100644
--- a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/descriptors/HdfsSystemDescriptor.java
+++ b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/descriptors/HdfsSystemDescriptor.java
@@ -92,7 +92,7 @@ public class HdfsSystemDescriptor extends SystemDescriptor<HdfsSystemDescriptor>
* @return this system descriptor
*/
public HdfsSystemDescriptor withDatePathFormat(String datePathFormat) {
- this.datePathFormat = Optional.of(StringUtils.stripToNull(datePathFormat));
+ this.datePathFormat = Optional.ofNullable(StringUtils.stripToNull(datePathFormat));
return this;
}
@@ -102,7 +102,7 @@ public class HdfsSystemDescriptor extends SystemDescriptor<HdfsSystemDescriptor>
* @return this system descriptor
*/
public HdfsSystemDescriptor withOutputBaseDir(String outputBaseDir) {
- this.outputBaseDir = Optional.of(StringUtils.stripToNull(outputBaseDir));
+ this.outputBaseDir = Optional.ofNullable(StringUtils.stripToNull(outputBaseDir));
return this;
}
@@ -135,7 +135,7 @@ public class HdfsSystemDescriptor extends SystemDescriptor<HdfsSystemDescriptor>
* @return this system descriptor
*/
public HdfsSystemDescriptor withWriteCompressionType(String writeCompressionType) {
- this.writeCompressionType = Optional.of(StringUtils.stripToNull(writeCompressionType));
+ this.writeCompressionType = Optional.ofNullable(StringUtils.stripToNull(writeCompressionType));
return this;
}
@@ -145,7 +145,7 @@ public class HdfsSystemDescriptor extends SystemDescriptor<HdfsSystemDescriptor>
* @return this system descriptor
*/
public HdfsSystemDescriptor withWriterClassName(String writerClassName) {
- this.writerClass = Optional.of(StringUtils.stripToNull(writerClassName));
+ this.writerClass = Optional.ofNullable(StringUtils.stripToNull(writerClassName));
return this;
}
@@ -175,7 +175,7 @@ public class HdfsSystemDescriptor extends SystemDescriptor<HdfsSystemDescriptor>
* @return this system descriptor
*/
public HdfsSystemDescriptor withConsumerWhiteList(String whiteList) {
- this.consumerWhiteList = Optional.of(StringUtils.stripToNull(whiteList));
+ this.consumerWhiteList = Optional.ofNullable(StringUtils.stripToNull(whiteList));
return this;
}
@@ -185,7 +185,7 @@ public class HdfsSystemDescriptor extends SystemDescriptor<HdfsSystemDescriptor>
* @return this system descriptor
*/
public HdfsSystemDescriptor withConsumerBlackList(String blackList) {
- this.consumerBlackList = Optional.of(StringUtils.stripToNull(blackList));
+ this.consumerBlackList = Optional.ofNullable(StringUtils.stripToNull(blackList));
return this;
}
@@ -195,7 +195,7 @@ public class HdfsSystemDescriptor extends SystemDescriptor<HdfsSystemDescriptor>
* @return this system descriptor
*/
public HdfsSystemDescriptor withConsumerGroupPattern(String groupPattern) {
- this.consumerGroupPattern = Optional.of(StringUtils.stripToNull(groupPattern));
+ this.consumerGroupPattern = Optional.ofNullable(StringUtils.stripToNull(groupPattern));
return this;
}
@@ -205,7 +205,7 @@ public class HdfsSystemDescriptor extends SystemDescriptor<HdfsSystemDescriptor>
* @return this system descriptor
*/
public HdfsSystemDescriptor withReaderType(String readerType) {
- this.consumerReader = Optional.of(StringUtils.stripToNull(readerType));
+ this.consumerReader = Optional.ofNullable(StringUtils.stripToNull(readerType));
return this;
}
@@ -216,7 +216,7 @@ public class HdfsSystemDescriptor extends SystemDescriptor<HdfsSystemDescriptor>
* @return this system descriptor
*/
public HdfsSystemDescriptor withStagingDirectory(String stagingDirectory) {
- this.consumerStagingDirectory = Optional.of(StringUtils.stripToNull(stagingDirectory));
+ this.consumerStagingDirectory = Optional.ofNullable(StringUtils.stripToNull(stagingDirectory));
return this;
}
@@ -225,29 +225,29 @@ public class HdfsSystemDescriptor extends SystemDescriptor<HdfsSystemDescriptor>
Map<String, String> config = new HashMap<>(super.toConfig());
String systemName = getSystemName();
- this.datePathFormat.ifPresent(
+ datePathFormat.ifPresent(
val -> config.put(String.format(HdfsConfig.DATE_PATH_FORMAT_STRING(), systemName), val));
- this.outputBaseDir.ifPresent(val -> config.put(String.format(HdfsConfig.BASE_OUTPUT_DIR(), systemName), val));
- this.writeBatchSizeBytes.ifPresent(
+ outputBaseDir.ifPresent(val -> config.put(String.format(HdfsConfig.BASE_OUTPUT_DIR(), systemName), val));
+ writeBatchSizeBytes.ifPresent(
val -> config.put(String.format(HdfsConfig.WRITE_BATCH_SIZE_BYTES(), systemName), String.valueOf(val)));
- this.writeBatchSizeRecords.ifPresent(
+ writeBatchSizeRecords.ifPresent(
val -> config.put(String.format(HdfsConfig.WRITE_BATCH_SIZE_RECORDS(), systemName), String.valueOf(val)));
- this.writeCompressionType.ifPresent(
+ writeCompressionType.ifPresent(
val -> config.put(String.format(HdfsConfig.COMPRESSION_TYPE(), systemName), val));
- this.writerClass.ifPresent(val -> config.put(String.format(HdfsConfig.HDFS_WRITER_CLASS_NAME(), systemName), val));
+ writerClass.ifPresent(val -> config.put(String.format(HdfsConfig.HDFS_WRITER_CLASS_NAME(), systemName), val));
- this.consumerBufferCapacity.ifPresent(
+ consumerBufferCapacity.ifPresent(
val -> config.put(String.format(HdfsConfig.CONSUMER_BUFFER_CAPACITY(), systemName), String.valueOf(val)));
- this.consumerMaxRetries.ifPresent(
+ consumerMaxRetries.ifPresent(
val -> config.put(String.format(HdfsConfig.CONSUMER_NUM_MAX_RETRIES(), systemName), String.valueOf(val)));
- this.consumerWhiteList.ifPresent(
+ consumerWhiteList.ifPresent(
val -> config.put(String.format(HdfsConfig.CONSUMER_PARTITIONER_WHITELIST(), systemName), val));
- this.consumerBlackList.ifPresent(
+ consumerBlackList.ifPresent(
val -> config.put(String.format(HdfsConfig.CONSUMER_PARTITIONER_BLACKLIST(), systemName), val));
- this.consumerGroupPattern.ifPresent(
+ consumerGroupPattern.ifPresent(
val -> config.put(String.format(HdfsConfig.CONSUMER_PARTITIONER_GROUP_PATTERN(), systemName), val));
- this.consumerReader.ifPresent(val -> config.put(String.format(HdfsConfig.FILE_READER_TYPE(), systemName), val));
- this.consumerStagingDirectory.ifPresent(
+ consumerReader.ifPresent(val -> config.put(String.format(HdfsConfig.FILE_READER_TYPE(), systemName), val));
+ consumerStagingDirectory.ifPresent(
val -> config.put(String.format(HdfsConfig.STAGING_DIRECTORY(), systemName), val));
return config;
diff --git a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/DirectoryPartitioner.java b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/DirectoryPartitioner.java
index 0661139..8244504 100644
--- a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/DirectoryPartitioner.java
+++ b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/DirectoryPartitioner.java
@@ -20,11 +20,9 @@
package org.apache.samza.system.hdfs.partitioner;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -33,7 +31,6 @@ import java.util.regex.Pattern;
import javax.annotation.Nullable;
-import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.samza.Partition;
@@ -63,13 +60,13 @@ public class DirectoryPartitioner {
private static final Logger LOG = LoggerFactory.getLogger(DirectoryPartitioner.class);
private static final String GROUP_IDENTIFIER = "\\[id]";
- private String whiteListRegex;
- private String blackListRegex;
- private String groupPattern;
- private FileSystemAdapter fileSystemAdapter;
+ private final String whiteListRegex;
+ private final String blackListRegex;
+ private final String groupPattern;
+ private final FileSystemAdapter fileSystemAdapter;
// stream name => partition => partition descriptor
- private Map<String, Map<Partition, List<String>>> partitionDescriptorMap = new HashMap<>();
+ private final Map<String, Map<Partition, List<String>>> partitionDescriptorMap = new HashMap<>();
public DirectoryPartitioner(String whiteList, String blackList, String groupPattern,
FileSystemAdapter fileSystemAdapter) {
@@ -93,7 +90,7 @@ public class DirectoryPartitioner {
allFiles.stream().filter(file -> file.getPath().matches(whiteListRegex) && !file.getPath().matches(blackListRegex))
.forEach(filteredFiles::add);
// sort the files to have a consistent order
- filteredFiles.sort((f1, f2) -> f1.getPath().compareTo(f2.getPath()));
+ filteredFiles.sort(Comparator.comparing(FileMetadata::getPath));
LOG.info(String.format("List of filtered files for %s: %s", streamName, filteredFiles));
return filteredFiles;
}
@@ -152,7 +149,7 @@ public class DirectoryPartitioner {
List<List<FileMetadata>> ret = new ArrayList<>();
// sort the map to guarantee consistent ordering
List<String> sortedKeys = new ArrayList<>(map.keySet());
- sortedKeys.sort(Comparator.<String>naturalOrder());
+ sortedKeys.sort(Comparator.naturalOrder());
sortedKeys.stream().forEach(key -> ret.add(map.get(key)));
return ret;
}
@@ -175,13 +172,7 @@ public class DirectoryPartitioner {
throw new SamzaException("The list of new files is not a super set of the old files. diff = "
+ oldFileSet.removeAll(newFileSet));
}
- Iterator<FileMetadata> iterator = newFileList.iterator();
- while (iterator.hasNext()) {
- FileMetadata file = iterator.next();
- if (!oldFileSet.contains(file.getPath())) {
- iterator.remove();
- }
- }
+ newFileList.removeIf(file -> !oldFileSet.contains(file.getPath()));
return newFileList;
}
diff --git a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/FileSystemAdapter.java b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/FileSystemAdapter.java
index 5fec4bf..7e35eec 100644
--- a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/FileSystemAdapter.java
+++ b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/FileSystemAdapter.java
@@ -33,11 +33,11 @@ public interface FileSystemAdapter {
* @param streamName name of the stream
* @return list of <code>FileMetadata</code> for all files associated to the given stream
*/
- public List<FileMetadata> getAllFiles(String streamName);
+ List<FileMetadata> getAllFiles(String streamName);
- public class FileMetadata {
- private String path;
- private long length;
+ class FileMetadata {
+ private final String path;
+ private final long length;
public FileMetadata(String path, long length) {
this.path = path;
diff --git a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/MultiFileHdfsReader.java b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/MultiFileHdfsReader.java
index eea68bb..cd8cc5e 100644
--- a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/MultiFileHdfsReader.java
+++ b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/MultiFileHdfsReader.java
@@ -46,7 +46,7 @@ public class MultiFileHdfsReader {
private final HdfsReaderFactory.ReaderType readerType;
private final SystemStreamPartition systemStreamPartition;
- private List<String> filePaths;
+ private final List<String> filePaths;
private SingleFileHdfsReader curReader;
private int curFileIndex = 0;
private String curSingleFileOffset;
@@ -127,7 +127,7 @@ public class MultiFileHdfsReader {
this.filePaths = partitionDescriptors;
this.numMaxRetries = numMaxRetries;
this.numRetries = 0;
- if (partitionDescriptors.size() <= 0) {
+ if (partitionDescriptors.isEmpty()) {
throw new SamzaException(
"Invalid number of files based on partition descriptors: " + partitionDescriptors.size());
}
diff --git a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/SingleFileHdfsReader.java b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/SingleFileHdfsReader.java
index eb8a70d..cbae032 100644
--- a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/SingleFileHdfsReader.java
+++ b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/SingleFileHdfsReader.java
@@ -28,35 +28,35 @@ public interface SingleFileHdfsReader {
* @param path path of the file to be read
* @param offset offset the reader should start from
*/
- public void open(String path, String offset);
+ void open(String path, String offset);
/**
* Seek to a specific offset
* @param offset offset the reader should seek to
*/
- public void seek(String offset);
+ void seek(String offset);
/**
* Construct and return the next message envelope
* @return constructed IncomeMessageEnvelope
*/
- public IncomingMessageEnvelope readNext();
+ IncomingMessageEnvelope readNext();
/**
* Get the next offset, which is the offset for the next message
* that will be returned by readNext
* @return next offset
*/
- public String nextOffset();
+ String nextOffset();
/**
* Whether there are still records to be read
* @return true of false based on whether the reader has hit end of file
*/
- public boolean hasNext();
+ boolean hasNext();
/**
* Close the reader.
*/
- public void close();
+ void close();
}
diff --git a/samza-kafka/src/main/java/org/apache/samza/config/KafkaConsumerConfig.java b/samza-kafka/src/main/java/org/apache/samza/config/KafkaConsumerConfig.java
index 21709be..dea60b3 100644
--- a/samza-kafka/src/main/java/org/apache/samza/config/KafkaConsumerConfig.java
+++ b/samza-kafka/src/main/java/org/apache/samza/config/KafkaConsumerConfig.java
@@ -71,8 +71,7 @@ public class KafkaConsumerConfig extends HashMap<String, Object> {
final String groupId = createConsumerGroupId(config);
- Map<String, Object> consumerProps = new HashMap<>();
- consumerProps.putAll(subConf);
+ Map<String, Object> consumerProps = new HashMap<>(subConf);
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
@@ -115,8 +114,7 @@ public class KafkaConsumerConfig extends HashMap<String, Object> {
}
// Override default max poll config if there is no value
- consumerProps.computeIfAbsent(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,
- (k) -> DEFAULT_KAFKA_CONSUMER_MAX_POLL_RECORDS);
+ consumerProps.putIfAbsent(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, DEFAULT_KAFKA_CONSUMER_MAX_POLL_RECORDS);
return new KafkaConsumerConfig(consumerProps, systemName);
}
@@ -194,7 +192,7 @@ public class KafkaConsumerConfig extends HashMap<String, Object> {
return autoOffsetReset;
}
// translate old kafka consumer values into new ones (SAMZA-1987 top remove it)
- String newAutoOffsetReset = null;
+ String newAutoOffsetReset;
switch (autoOffsetReset) {
case "largest":
newAutoOffsetReset = KAFKA_OFFSET_LATEST;
@@ -230,4 +228,4 @@ public class KafkaConsumerConfig extends HashMap<String, Object> {
return newAutoOffsetReset;
}
-}
\ No newline at end of file
+}
diff --git a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java
index 113dced..d621308 100644
--- a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java
+++ b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java
@@ -33,7 +33,7 @@ import org.slf4j.LoggerFactory;
* Extends StreamSpec with the ability to easily get the topic replication factor.
*/
public class KafkaStreamSpec extends StreamSpec {
- private static Logger LOG = LoggerFactory.getLogger(KafkaStreamSpec.class);
+ private static final Logger LOG = LoggerFactory.getLogger(KafkaStreamSpec.class);
private static final int DEFAULT_REPLICATION_FACTOR = 2;
@@ -59,10 +59,8 @@ public class KafkaStreamSpec extends StreamSpec {
* @return The Map instance.
*/
private static Map<String, String> propertiesToMap(Properties properties) {
- Map<String, String> map = new HashMap<String, String>();
- for (final String name: properties.stringPropertyNames()) {
- map.put(name, properties.getProperty(name));
- }
+ Map<String, String> map = new HashMap<>();
+ properties.stringPropertyNames().forEach(name -> map.put(name, properties.getProperty(name)));
return map;
}
diff --git a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
index 36aa695..f4db408 100644
--- a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
+++ b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
@@ -244,9 +244,7 @@ public class KafkaSystemAdmin implements SystemAdmin {
List<PartitionInfo> partitionInfos = metadataConsumer.partitionsFor(streamName);
LOG.debug("Stream {} has partitions {}", streamName, partitionInfos);
- partitionInfos.forEach(partitionInfo -> {
- partitionMetadata.put(new Partition(partitionInfo.partition()), dummySspm);
- });
+ partitionInfos.forEach(partitionInfo -> partitionMetadata.put(new Partition(partitionInfo.partition()), dummySspm));
allMetadata.put(streamName, new SystemStreamMetadata(streamName, partitionMetadata));
});
@@ -370,9 +368,7 @@ public class KafkaSystemAdmin implements SystemAdmin {
}
};
- Map<String, SystemStreamMetadata> result =
- retryBackoff.run(fetchMetadataOperation, onExceptionRetryOperation).getOrElse(fallbackOperation);
- return result;
+ return retryBackoff.run(fetchMetadataOperation, onExceptionRetryOperation).getOrElse(fallbackOperation);
}
/**
@@ -401,9 +397,7 @@ public class KafkaSystemAdmin implements SystemAdmin {
Map<TopicPartition, Long> upcomingOffsetsWithLong = metadataConsumer.endOffsets(topicPartitions);
LOG.debug("Kafka-fetched endOffsets: {}", upcomingOffsetsWithLong);
- oldestOffsetsWithLong.forEach((topicPartition, offset) -> {
- oldestOffsets.put(toSystemStreamPartition(topicPartition), String.valueOf(offset));
- });
+ oldestOffsetsWithLong.forEach((topicPartition, offset) -> oldestOffsets.put(toSystemStreamPartition(topicPartition), String.valueOf(offset)));
upcomingOffsetsWithLong.forEach((topicPartition, offset) -> {
upcomingOffsets.put(toSystemStreamPartition(topicPartition), String.valueOf(offset));
@@ -511,7 +505,7 @@ public class KafkaSystemAdmin implements SystemAdmin {
NewTopic newTopic = new NewTopic(topicName, kSpec.getPartitionCount(), (short) kSpec.getReplicationFactor());
// specify the configs
- Map<String, String> streamConfig = new HashMap(streamSpec.getConfig());
+ Map<String, String> streamConfig = new HashMap<>(streamSpec.getConfig());
// HACK - replication.factor is invalid config for AdminClient.createTopics
if (streamConfig.containsKey(REPL_FACTOR)) {
String repl = streamConfig.get(REPL_FACTOR);
@@ -561,7 +555,7 @@ public class KafkaSystemAdmin implements SystemAdmin {
}
/**
- * Converts a StreamSpec into a KafakStreamSpec. Special handling for coordinator and changelog stream.
+ * Converts a StreamSpec into a KafkaStreamSpec. Special handling for coordinator and changelog stream.
* @param spec a StreamSpec object
* @return KafkaStreamSpec object
*/
@@ -615,7 +609,7 @@ public class KafkaSystemAdmin implements SystemAdmin {
// get partition info for topic
Map<String, List<PartitionInfo>> getTopicMetadata(Set<String> topics) {
- Map<String, List<PartitionInfo>> streamToPartitionsInfo = new HashMap();
+ Map<String, List<PartitionInfo>> streamToPartitionsInfo = new HashMap<>();
List<PartitionInfo> partitionInfoList;
for (String topic : topics) {
partitionInfoList = metadataConsumer.partitionsFor(topic);
diff --git a/samza-kafka/src/main/java/org/apache/samza/system/kafka/descriptors/KafkaInputDescriptor.java b/samza-kafka/src/main/java/org/apache/samza/system/kafka/descriptors/KafkaInputDescriptor.java
index d9477e5..54f433f 100644
--- a/samza-kafka/src/main/java/org/apache/samza/system/kafka/descriptors/KafkaInputDescriptor.java
+++ b/samza-kafka/src/main/java/org/apache/samza/system/kafka/descriptors/KafkaInputDescriptor.java
@@ -71,7 +71,7 @@ public class KafkaInputDescriptor<StreamMessageType>
* @return this input descriptor
*/
public KafkaInputDescriptor<StreamMessageType> withConsumerAutoOffsetReset(String consumerAutoOffsetReset) {
- this.consumerAutoOffsetResetOptional = Optional.of(StringUtils.stripToNull(consumerAutoOffsetReset));
+ this.consumerAutoOffsetResetOptional = Optional.ofNullable(StringUtils.stripToNull(consumerAutoOffsetReset));
return this;
}
@@ -94,7 +94,7 @@ public class KafkaInputDescriptor<StreamMessageType>
@Override
public Map<String, String> toConfig() {
- HashMap<String, String> configs = new HashMap<>(super.toConfig());
+ Map<String, String> configs = new HashMap<>(super.toConfig());
// Note: Kafka configuration needs the topic's physical name, not the stream-id.
// We won't have that here if user only specified it in configs, or if it got rewritten
// by the planner to something different than what's in this descriptor.
diff --git a/samza-kafka/src/main/java/org/apache/samza/system/kafka/descriptors/KafkaSystemDescriptor.java b/samza-kafka/src/main/java/org/apache/samza/system/kafka/descriptors/KafkaSystemDescriptor.java
index 091c21a..8c4d48b 100644
--- a/samza-kafka/src/main/java/org/apache/samza/system/kafka/descriptors/KafkaSystemDescriptor.java
+++ b/samza-kafka/src/main/java/org/apache/samza/system/kafka/descriptors/KafkaSystemDescriptor.java
@@ -120,7 +120,7 @@ public class KafkaSystemDescriptor extends SystemDescriptor<KafkaSystemDescripto
* @return this system descriptor
*/
public KafkaSystemDescriptor withConsumerAutoOffsetReset(String consumerAutoOffsetReset) {
- this.consumerAutoOffsetResetOptional = Optional.of(StringUtils.stripToNull(consumerAutoOffsetReset));
+ this.consumerAutoOffsetResetOptional = Optional.ofNullable(StringUtils.stripToNull(consumerAutoOffsetReset));
return this;
}
@@ -246,4 +246,4 @@ public class KafkaSystemDescriptor extends SystemDescriptor<KafkaSystemDescripto
producerConfigs.forEach((key, value) -> configs.put(String.format(PRODUCER_CONFIGS_CONFIG_KEY, getSystemName(), key), value));
return configs;
}
-}
\ No newline at end of file
+}
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
index 93b1ab2..ac0a55c 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
@@ -57,7 +57,7 @@ public class KafkaConsumerProxy<K, V> {
private static final int SLEEP_MS_WHILE_NO_TOPIC_PARTITION = 100;
- final Thread consumerPollThread;
+ private final Thread consumerPollThread;
private final Consumer<K, V> kafkaConsumer;
private final KafkaSystemConsumer.KafkaConsumerMessageSink sink;
private final KafkaSystemConsumerMetrics kafkaConsumerMetrics;
@@ -187,7 +187,7 @@ public class KafkaConsumerProxy<K, V> {
// creates a separate thread for getting the messages.
private Runnable createProxyThreadRunnable() {
- Runnable runnable = () -> {
+ return () -> {
isRunning = true;
try {
@@ -208,8 +208,6 @@ public class KafkaConsumerProxy<K, V> {
LOG.info("KafkaConsumerProxy for system {} has stopped.", systemName);
}
};
-
- return runnable;
}
private void fetchMessages() {
@@ -305,11 +303,7 @@ public class KafkaConsumerProxy<K, V> {
updateMetrics(record, tp);
SystemStreamPartition ssp = topicPartitionToSSP.get(tp);
- List<IncomingMessageEnvelope> messages = results.get(ssp);
- if (messages == null) {
- messages = new ArrayList<>();
- results.put(ssp, messages);
- }
+ List<IncomingMessageEnvelope> messages = results.computeIfAbsent(ssp, k -> new ArrayList<>());
IncomingMessageEnvelope incomingMessageEnvelope = handleNewRecord(record, ssp);
messages.add(incomingMessageEnvelope);
@@ -359,7 +353,7 @@ public class KafkaConsumerProxy<K, V> {
if (lag == null) {
throw new SamzaException("Unknown/unregistered ssp in latestLags. ssp=" + ssp + "; system=" + systemName);
}
- long currentSSPLag = lag.longValue(); // lag between the current offset and the highwatermark
+ long currentSSPLag = lag; // lag between the current offset and the highwatermark
if (currentSSPLag < 0) {
return;
}
@@ -397,7 +391,7 @@ public class KafkaConsumerProxy<K, V> {
// populate the MetricNames first time
if (perPartitionMetrics.isEmpty()) {
- HashMap<String, String> tags = new HashMap<>();
+ Map<String, String> tags = new HashMap<>();
tags.put("client-id", clientId); // this is required by the KafkaConsumer to get the metrics
for (SystemStreamPartition ssp : ssps) {
@@ -429,10 +423,10 @@ public class KafkaConsumerProxy<K, V> {
Long lag = latestLags.get(ssp);
LOG.trace("Latest offset of {} is {}; lag = {}", ssp, offset, lag);
if (lag != null && offset != null && lag >= 0) {
- long streamEndOffset = offset.longValue() + lag.longValue();
+ long streamEndOffset = offset + lag;
// update the metrics
kafkaConsumerMetrics.setHighWatermarkValue(tp, streamEndOffset);
- kafkaConsumerMetrics.setLagValue(tp, lag.longValue());
+ kafkaConsumerMetrics.setLagValue(tp, lag);
}
}
}