You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ca...@apache.org on 2020/06/29 18:42:01 UTC
[samza] branch master updated: SAMZA-2551: Upgrade all modules to
automatically use checkstyle 6.11.2 (part 3: includes samza-kafka) (#1394)
This is an automated email from the ASF dual-hosted git repository.
cameronlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new fcf1bbc SAMZA-2551: Upgrade all modules to automatically use checkstyle 6.11.2 (part 3: includes samza-kafka) (#1394)
fcf1bbc is described below
commit fcf1bbc8fcc41fc38eb870dc16c10cf55e78d706
Author: Cameron Lee <ca...@linkedin.com>
AuthorDate: Mon Jun 29 11:41:53 2020 -0700
SAMZA-2551: Upgrade all modules to automatically use checkstyle 6.11.2 (part 3: includes samza-kafka) (#1394)
API/Upgrade/Usage changes: None
---
build.gradle | 6 +
checkstyle/checkstyle-suppressions.xml | 11 +
.../kafka/KafkaCheckpointLogKeySerde.java | 6 +-
.../apache/samza/config/KafkaConsumerConfig.java | 30 ++-
.../samza/system/kafka/KafkaConsumerProxy.java | 11 +-
.../system/kafka/KafkaConsumerProxyFactory.java | 2 -
.../apache/samza/system/kafka/KafkaStreamSpec.java | 18 +-
.../samza/system/kafka/KafkaSystemAdmin.java | 242 ++++++++++-----------
.../samza/system/kafka/KafkaSystemConsumer.java | 6 +-
.../kafka/descriptors/KafkaSystemDescriptor.java | 2 +-
.../kafka/TestKafkaCheckpointManagerJava.java | 2 +-
.../samza/system/kafka/MockKafkaProducer.java | 41 ++--
.../kafka/TestKafkaCheckpointManagerFactory.java | 1 -
.../samza/system/kafka/TestKafkaStreamSpec.java | 8 +-
.../system/kafka/TestKafkaSystemAdminJava.java | 24 +-
.../system/kafka/TestKafkaSystemAdminWithMock.java | 2 +-
.../system/kafka/TestKafkaSystemConsumer.java | 5 +-
.../kafka/TestKafkaSystemConsumerMetrics.java | 5 +-
.../system/kafka/TestKafkaSystemProducerJava.java | 25 +--
.../descriptors/TestKafkaInputDescriptor.java | 5 +-
.../descriptors/TestKafkaSystemDescriptor.java | 3 +-
21 files changed, 221 insertions(+), 234 deletions(-)
diff --git a/build.gradle b/build.gradle
index 25afaeb..7268838 100644
--- a/build.gradle
+++ b/build.gradle
@@ -397,6 +397,7 @@ project(":samza-tools_$scalaSuffix") {
project(":samza-kafka_$scalaSuffix") {
apply plugin: 'scala'
+ apply plugin: 'checkstyle'
// Force scala joint compilation
sourceSets.main.scala.srcDir "src/main/java"
@@ -434,6 +435,11 @@ project(":samza-kafka_$scalaSuffix") {
testRuntime "org.slf4j:slf4j-simple:$slf4jVersion"
}
+ checkstyle {
+ configFile = new File(rootDir, "checkstyle/checkstyle.xml")
+ toolVersion = "$checkstyleVersion"
+ }
+
test {
// Bump up the heap so we can start ZooKeeper and Kafka brokers.
minHeapSize = "1560m"
diff --git a/checkstyle/checkstyle-suppressions.xml b/checkstyle/checkstyle-suppressions.xml
index e887fdc..0c9bad5 100644
--- a/checkstyle/checkstyle-suppressions.xml
+++ b/checkstyle/checkstyle-suppressions.xml
@@ -25,8 +25,19 @@
<suppress checks="ConstantName"
files="ApplicationStatus.java"
lines="26-29"/>
+ <!--
+ API javadocs (including descriptors) may reference classes (using '{@link <className>}') which
+ aren't directly used in the API class. In order to help the API javadoc look cleaner, we can use
+ the simple class name in the javadoc and then import that class. However, for non-API code, we
+ want checkstyle prevent an import just for documentation purposes. The
+ "preventJavadocUnusedImports" id includes processing of javadocs when checking unused imports.
+ We will apply that to the API classes only.
+ -->
<suppress id="preventJavadocUnusedImports"
files=".*samza-api.*"/>
+ <suppress id="preventJavadocUnusedImports"
+ files="samza-kafka/src/main/java/org/apache/samza/system/kafka/descriptors" />
+
<!-- suppress avro schema classes since they are based on auto-generated code -->
<suppress files="samza-sql/src/test/java/org/apache/samza/sql/avro/schemas" checks=".*" />
</suppressions>
diff --git a/samza-kafka/src/main/java/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKeySerde.java b/samza-kafka/src/main/java/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKeySerde.java
index 8e0c815..cc771d3 100644
--- a/samza-kafka/src/main/java/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKeySerde.java
+++ b/samza-kafka/src/main/java/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKeySerde.java
@@ -39,12 +39,12 @@ public class KafkaCheckpointLogKeySerde implements Serde<KafkaCheckpointLogKey>
private static final String SSP_GROUPER_FACTORY_FIELD = "systemstreampartition-grouper-factory";
private static final String TASK_NAME_FIELD = "taskName";
private static final String TYPE_FIELD = "type";
- private static final ObjectMapper mapper = new ObjectMapper();
+ private static final ObjectMapper MAPPER = new ObjectMapper();
@Override
public byte[] toBytes(KafkaCheckpointLogKey key) {
try {
- return mapper.writeValueAsBytes(ImmutableMap.of(
+ return MAPPER.writeValueAsBytes(ImmutableMap.of(
SSP_GROUPER_FACTORY_FIELD, key.getGrouperFactoryClassName(),
TASK_NAME_FIELD, key.getTaskName().toString(),
TYPE_FIELD, key.getType()
@@ -57,7 +57,7 @@ public class KafkaCheckpointLogKeySerde implements Serde<KafkaCheckpointLogKey>
@Override
public KafkaCheckpointLogKey fromBytes(byte[] bytes) {
try {
- LinkedHashMap<String, String> deserializedKey = mapper.readValue(bytes, LinkedHashMap.class);
+ LinkedHashMap<String, String> deserializedKey = MAPPER.readValue(bytes, LinkedHashMap.class);
if (!KafkaCheckpointLogKey.CHECKPOINT_KEY_TYPE.equals(deserializedKey.get(TYPE_FIELD))) {
throw new IllegalArgumentException(String.format("Invalid key detected. Type of the key is %s", deserializedKey.get(TYPE_FIELD)));
diff --git a/samza-kafka/src/main/java/org/apache/samza/config/KafkaConsumerConfig.java b/samza-kafka/src/main/java/org/apache/samza/config/KafkaConsumerConfig.java
index 3ac84df..d7464c6 100644
--- a/samza-kafka/src/main/java/org/apache/samza/config/KafkaConsumerConfig.java
+++ b/samza-kafka/src/main/java/org/apache/samza/config/KafkaConsumerConfig.java
@@ -1,5 +1,4 @@
/*
- *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -16,14 +15,11 @@
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
- *
*/
-
package org.apache.samza.config;
import java.util.HashMap;
import java.util.Map;
-import java.util.Optional;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
@@ -128,7 +124,7 @@ public class KafkaConsumerConfig extends HashMap<String, Object> {
}
public int fetchMessageMaxBytes() {
- String fetchSize = (String)get("fetch.message.max.bytes");
+ String fetchSize = (String) get("fetch.message.max.bytes");
if (StringUtils.isBlank(fetchSize)) {
return FETCH_MAX_BYTES;
} else {
@@ -177,26 +173,26 @@ public class KafkaConsumerConfig extends HashMap<String, Object> {
*/
static String getAutoOffsetResetValue(final String autoOffsetReset, final String samzaOffsetDefault) {
// valid kafka consumer values
- final String KAFKA_OFFSET_LATEST = "latest";
- final String KAFKA_OFFSET_EARLIEST = "earliest";
- final String KAFKA_OFFSET_NONE = "none";
+ final String kafkaOffsetLatest = "latest";
+ final String kafkaOffsetEarliest = "earliest";
+ final String kafkaOffsetNone = "none";
// if the value for KafkaConsumer is set - use it.
if (!StringUtils.isBlank(autoOffsetReset)) {
- if (autoOffsetReset.equals(KAFKA_OFFSET_EARLIEST) || autoOffsetReset.equals(KAFKA_OFFSET_LATEST)
- || autoOffsetReset.equals(KAFKA_OFFSET_NONE)) {
+ if (autoOffsetReset.equals(kafkaOffsetEarliest) || autoOffsetReset.equals(kafkaOffsetLatest)
+ || autoOffsetReset.equals(kafkaOffsetNone)) {
return autoOffsetReset;
}
// translate old kafka consumer values into new ones (SAMZA-1987 top remove it)
String newAutoOffsetReset;
switch (autoOffsetReset) {
case "largest":
- newAutoOffsetReset = KAFKA_OFFSET_LATEST;
- LOG.warn("Using old (deprecated) value for kafka consumer config auto.offset.reset = {}. The right value should be {}", autoOffsetReset, KAFKA_OFFSET_LATEST);
+ newAutoOffsetReset = kafkaOffsetLatest;
+ LOG.warn("Using old (deprecated) value for kafka consumer config auto.offset.reset = {}. The right value should be {}", autoOffsetReset, kafkaOffsetLatest);
break;
case "smallest":
- newAutoOffsetReset = KAFKA_OFFSET_EARLIEST;
- LOG.warn("Using old (deprecated) value for kafka consumer config auto.offset.reset = {}. The right value should be {}", autoOffsetReset, KAFKA_OFFSET_EARLIEST);
+ newAutoOffsetReset = kafkaOffsetEarliest;
+ LOG.warn("Using old (deprecated) value for kafka consumer config auto.offset.reset = {}. The right value should be {}", autoOffsetReset, kafkaOffsetEarliest);
break;
default:
throw new SamzaException("Using invalid value for kafka consumer config auto.offset.reset " + autoOffsetReset + ". See KafkaConsumer config for the correct values.");
@@ -207,14 +203,14 @@ public class KafkaConsumerConfig extends HashMap<String, Object> {
}
// in case kafka consumer configs are not provided we should match them to Samza's ones.
- String newAutoOffsetReset = KAFKA_OFFSET_LATEST;
+ String newAutoOffsetReset = kafkaOffsetLatest;
if (!StringUtils.isBlank(samzaOffsetDefault)) {
switch (samzaOffsetDefault) {
case SystemConfig.SAMZA_SYSTEM_OFFSET_UPCOMING:
- newAutoOffsetReset = KAFKA_OFFSET_LATEST;
+ newAutoOffsetReset = kafkaOffsetLatest;
break;
case SystemConfig.SAMZA_SYSTEM_OFFSET_OLDEST:
- newAutoOffsetReset = KAFKA_OFFSET_EARLIEST;
+ newAutoOffsetReset = kafkaOffsetEarliest;
break;
default:
throw new SamzaException("Using invalid value for samza default offset config " + autoOffsetReset + ". See samza config for the correct values");
diff --git a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaConsumerProxy.java b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaConsumerProxy.java
index 71ebe00..329073c 100644
--- a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaConsumerProxy.java
+++ b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaConsumerProxy.java
@@ -1,5 +1,4 @@
/*
- *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -16,9 +15,7 @@
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
- *
*/
-
package org.apache.samza.system.kafka;
import java.time.Instant;
@@ -124,12 +121,12 @@ public class KafkaConsumerProxy<K, V> {
isRunning = false;
try {
- consumerPollThread.join(timeoutMs/2);
+ consumerPollThread.join(timeoutMs / 2);
// join() may timeout
// in this case we should interrupt it and wait again
if (consumerPollThread.isAlive()) {
consumerPollThread.interrupt();
- consumerPollThread.join(timeoutMs/2);
+ consumerPollThread.join(timeoutMs / 2);
}
} catch (InterruptedException e) {
LOG.warn("Join in KafkaConsumerProxy has failed", e);
@@ -452,13 +449,11 @@ public class KafkaConsumerProxy<K, V> {
}
}
- @Override
+ @Override
public String toString() {
return String.format("consumerProxy-%s-%s", systemName, clientId);
}
-
-
/**
* Used to create an instance of {@link KafkaConsumerProxy}. This can be overridden in case an extension of
* {@link KafkaConsumerProxy} needs to be used within kafka system components like {@link KafkaSystemConsumer}.
diff --git a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaConsumerProxyFactory.java b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaConsumerProxyFactory.java
index cc4bddc..903e511 100644
--- a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaConsumerProxyFactory.java
+++ b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaConsumerProxyFactory.java
@@ -1,5 +1,4 @@
/*
- *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -16,7 +15,6 @@
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
- *
*/
package org.apache.samza.system.kafka;
diff --git a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java
index d621308..3324c67 100644
--- a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java
+++ b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java
@@ -97,18 +97,18 @@ public class KafkaStreamSpec extends StreamSpec {
*/
public static KafkaStreamSpec fromSpec(StreamSpec originalSpec) {
if (originalSpec instanceof KafkaStreamSpec) {
- return ((KafkaStreamSpec) originalSpec);
+ return (KafkaStreamSpec) originalSpec;
}
- int replicationFactor = Integer.parseInt(originalSpec.getOrDefault( KafkaConfig.TOPIC_REPLICATION_FACTOR(),
- KafkaConfig.TOPIC_DEFAULT_REPLICATION_FACTOR()));
+ int replicationFactor = Integer.parseInt(originalSpec.getOrDefault(KafkaConfig.TOPIC_REPLICATION_FACTOR(),
+ KafkaConfig.TOPIC_DEFAULT_REPLICATION_FACTOR()));
- return new KafkaStreamSpec( originalSpec.getId(),
- originalSpec.getPhysicalName(),
- originalSpec.getSystemName(),
- originalSpec.getPartitionCount(),
- replicationFactor,
- mapToProperties(filterUnsupportedProperties(originalSpec.getConfig())));
+ return new KafkaStreamSpec(originalSpec.getId(),
+ originalSpec.getPhysicalName(),
+ originalSpec.getSystemName(),
+ originalSpec.getPartitionCount(),
+ replicationFactor,
+ mapToProperties(filterUnsupportedProperties(originalSpec.getConfig())));
}
/**
diff --git a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
index 91d4b11..a60752c 100644
--- a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
+++ b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
@@ -213,49 +213,49 @@ public class KafkaSystemAdmin implements SystemAdmin {
public Map<String, SystemStreamMetadata> getSystemStreamPartitionCounts(Set<String> streamNames, long cacheTTL) {
// This optimization omits actual metadata for performance. Instead, we inject a dummy for all partitions.
final SystemStreamMetadata.SystemStreamPartitionMetadata dummySspm =
- new SystemStreamMetadata.SystemStreamPartitionMetadata(null, null, null) {
- String msg =
- "getSystemStreamPartitionCounts does not populate SystemStreaMetadata info. Only number of partitions";
+ new SystemStreamMetadata.SystemStreamPartitionMetadata(null, null, null) {
+ String msg =
+ "getSystemStreamPartitionCounts does not populate SystemStreaMetadata info. Only number of partitions";
- @Override
- public String getOldestOffset() {
- throw new NotImplementedException(msg);
- }
+ @Override
+ public String getOldestOffset() {
+ throw new NotImplementedException(msg);
+ }
- @Override
- public String getNewestOffset() {
- throw new NotImplementedException(msg);
- }
+ @Override
+ public String getNewestOffset() {
+ throw new NotImplementedException(msg);
+ }
- @Override
- public String getUpcomingOffset() {
- throw new NotImplementedException(msg);
- }
- };
+ @Override
+ public String getUpcomingOffset() {
+ throw new NotImplementedException(msg);
+ }
+ };
ExponentialSleepStrategy strategy = new ExponentialSleepStrategy(DEFAULT_EXPONENTIAL_SLEEP_BACK_OFF_MULTIPLIER,
DEFAULT_EXPONENTIAL_SLEEP_INITIAL_DELAY_MS, DEFAULT_EXPONENTIAL_SLEEP_MAX_DELAY_MS);
Function1<ExponentialSleepStrategy.RetryLoop, Map<String, SystemStreamMetadata>> fetchMetadataOperation =
- new AbstractFunction1<ExponentialSleepStrategy.RetryLoop, Map<String, SystemStreamMetadata>>() {
- @Override
- public Map<String, SystemStreamMetadata> apply(ExponentialSleepStrategy.RetryLoop loop) {
- Map<String, SystemStreamMetadata> allMetadata = new HashMap<>();
-
- streamNames.forEach(streamName -> {
- Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata> partitionMetadata = new HashMap<>();
-
- List<PartitionInfo> partitionInfos = threadSafeKafkaConsumer.execute(consumer -> consumer.partitionsFor(streamName));
- LOG.debug("Stream {} has partitions {}", streamName, partitionInfos);
- partitionInfos.forEach(
- partitionInfo -> partitionMetadata.put(new Partition(partitionInfo.partition()), dummySspm));
- allMetadata.put(streamName, new SystemStreamMetadata(streamName, partitionMetadata));
- });
-
- loop.done();
- return allMetadata;
- }
- };
+ new AbstractFunction1<ExponentialSleepStrategy.RetryLoop, Map<String, SystemStreamMetadata>>() {
+ @Override
+ public Map<String, SystemStreamMetadata> apply(ExponentialSleepStrategy.RetryLoop loop) {
+ Map<String, SystemStreamMetadata> allMetadata = new HashMap<>();
+
+ streamNames.forEach(streamName -> {
+ Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata> partitionMetadata = new HashMap<>();
+
+ List<PartitionInfo> partitionInfos = threadSafeKafkaConsumer.execute(consumer -> consumer.partitionsFor(streamName));
+ LOG.debug("Stream {} has partitions {}", streamName, partitionInfos);
+ partitionInfos.forEach(
+ partitionInfo -> partitionMetadata.put(new Partition(partitionInfo.partition()), dummySspm));
+ allMetadata.put(streamName, new SystemStreamMetadata(streamName, partitionMetadata));
+ });
+
+ loop.done();
+ return allMetadata;
+ }
+ };
Map<String, SystemStreamMetadata> result = strategy.run(fetchMetadataOperation,
new AbstractFunction2<Exception, ExponentialSleepStrategy.RetryLoop, BoxedUnit>() {
@@ -323,51 +323,51 @@ public class KafkaSystemAdmin implements SystemAdmin {
Function1<ExponentialSleepStrategy.RetryLoop, Map<SystemStreamPartition,
SystemStreamMetadata.SystemStreamPartitionMetadata>> fetchTopicPartitionMetadataOperation =
- new AbstractFunction1<ExponentialSleepStrategy.RetryLoop, Map<SystemStreamPartition,
- SystemStreamMetadata.SystemStreamPartitionMetadata>>() {
-
- @Override
- public Map<SystemStreamPartition, SystemStreamMetadata.SystemStreamPartitionMetadata> apply(
- ExponentialSleepStrategy.RetryLoop loop) {
- OffsetsMaps topicPartitionsMetadata = fetchTopicPartitionsMetadata(topicPartitions);
-
- Map<SystemStreamPartition, SystemStreamMetadata.SystemStreamPartitionMetadata> sspToSSPMetadata = new HashMap<>();
- for (SystemStreamPartition ssp : ssps) {
- String oldestOffset = topicPartitionsMetadata.getOldestOffsets().get(ssp);
- String newestOffset = topicPartitionsMetadata.getNewestOffsets().get(ssp);
- String upcomingOffset = topicPartitionsMetadata.getUpcomingOffsets().get(ssp);
-
- sspToSSPMetadata.put(ssp,
- new SystemStreamMetadata.SystemStreamPartitionMetadata(oldestOffset, newestOffset, upcomingOffset));
- }
- loop.done();
- return sspToSSPMetadata;
+ new AbstractFunction1<ExponentialSleepStrategy.RetryLoop, Map<SystemStreamPartition,
+ SystemStreamMetadata.SystemStreamPartitionMetadata>>() {
+
+ @Override
+ public Map<SystemStreamPartition, SystemStreamMetadata.SystemStreamPartitionMetadata> apply(
+ ExponentialSleepStrategy.RetryLoop loop) {
+ OffsetsMaps topicPartitionsMetadata = fetchTopicPartitionsMetadata(topicPartitions);
+
+ Map<SystemStreamPartition, SystemStreamMetadata.SystemStreamPartitionMetadata> sspToSSPMetadata = new HashMap<>();
+ for (SystemStreamPartition ssp : ssps) {
+ String oldestOffset = topicPartitionsMetadata.getOldestOffsets().get(ssp);
+ String newestOffset = topicPartitionsMetadata.getNewestOffsets().get(ssp);
+ String upcomingOffset = topicPartitionsMetadata.getUpcomingOffsets().get(ssp);
+
+ sspToSSPMetadata.put(ssp,
+ new SystemStreamMetadata.SystemStreamPartitionMetadata(oldestOffset, newestOffset, upcomingOffset));
}
- };
+ loop.done();
+ return sspToSSPMetadata;
+ }
+ };
Function2<Exception, ExponentialSleepStrategy.RetryLoop, BoxedUnit> onExceptionRetryOperation =
- new AbstractFunction2<Exception, ExponentialSleepStrategy.RetryLoop, BoxedUnit>() {
- @Override
- public BoxedUnit apply(Exception exception, ExponentialSleepStrategy.RetryLoop loop) {
- if (loop.sleepCount() < MAX_RETRIES_ON_EXCEPTION) {
- LOG.warn(
- String.format("Fetching SSP metadata for: %s threw an exception. Retrying.", ssps), exception);
- } else {
- LOG.error(String.format("Fetching SSP metadata for: %s threw an exception.", ssps), exception);
- loop.done();
- throw new SamzaException(exception);
- }
- return null;
+ new AbstractFunction2<Exception, ExponentialSleepStrategy.RetryLoop, BoxedUnit>() {
+ @Override
+ public BoxedUnit apply(Exception exception, ExponentialSleepStrategy.RetryLoop loop) {
+ if (loop.sleepCount() < MAX_RETRIES_ON_EXCEPTION) {
+ LOG.warn(
+ String.format("Fetching SSP metadata for: %s threw an exception. Retrying.", ssps), exception);
+ } else {
+ LOG.error(String.format("Fetching SSP metadata for: %s threw an exception.", ssps), exception);
+ loop.done();
+ throw new SamzaException(exception);
}
- };
+ return null;
+ }
+ };
Function0<Map<SystemStreamPartition, SystemStreamMetadata.SystemStreamPartitionMetadata>> fallbackOperation =
- new AbstractFunction0<Map<SystemStreamPartition, SystemStreamMetadata.SystemStreamPartitionMetadata>>() {
- @Override
- public Map<SystemStreamPartition, SystemStreamMetadata.SystemStreamPartitionMetadata> apply() {
- throw new SamzaException("Failed to get SSP metadata");
- }
- };
+ new AbstractFunction0<Map<SystemStreamPartition, SystemStreamMetadata.SystemStreamPartitionMetadata>>() {
+ @Override
+ public Map<SystemStreamPartition, SystemStreamMetadata.SystemStreamPartitionMetadata> apply() {
+ throw new SamzaException("Failed to get SSP metadata");
+ }
+ };
return retryBackoff.run(fetchTopicPartitionMetadataOperation, onExceptionRetryOperation).getOrElse(fallbackOperation);
}
@@ -389,41 +389,41 @@ public class KafkaSystemAdmin implements SystemAdmin {
LOG.info("Fetching system stream metadata for {} from system {}", streamNames, systemName);
Function1<ExponentialSleepStrategy.RetryLoop, Map<String, SystemStreamMetadata>> fetchMetadataOperation =
- new AbstractFunction1<ExponentialSleepStrategy.RetryLoop, Map<String, SystemStreamMetadata>>() {
- @Override
- public Map<String, SystemStreamMetadata> apply(ExponentialSleepStrategy.RetryLoop loop) {
- Map<String, SystemStreamMetadata> metadata = fetchSystemStreamMetadata(streamNames);
- loop.done();
- return metadata;
- }
- };
+ new AbstractFunction1<ExponentialSleepStrategy.RetryLoop, Map<String, SystemStreamMetadata>>() {
+ @Override
+ public Map<String, SystemStreamMetadata> apply(ExponentialSleepStrategy.RetryLoop loop) {
+ Map<String, SystemStreamMetadata> metadata = fetchSystemStreamMetadata(streamNames);
+ loop.done();
+ return metadata;
+ }
+ };
Function2<Exception, ExponentialSleepStrategy.RetryLoop, BoxedUnit> onExceptionRetryOperation =
- new AbstractFunction2<Exception, ExponentialSleepStrategy.RetryLoop, BoxedUnit>() {
- @Override
- public BoxedUnit apply(Exception exception, ExponentialSleepStrategy.RetryLoop loop) {
- if (loop.sleepCount() < MAX_RETRIES_ON_EXCEPTION) {
- LOG.warn(
- String.format("Fetching system stream metadata for: %s threw an exception. Retrying.", streamNames),
- exception);
- } else {
- LOG.error(String.format("Fetching system stream metadata for: %s threw an exception.", streamNames),
- exception);
- loop.done();
- throw new SamzaException(exception);
- }
-
- return null;
+ new AbstractFunction2<Exception, ExponentialSleepStrategy.RetryLoop, BoxedUnit>() {
+ @Override
+ public BoxedUnit apply(Exception exception, ExponentialSleepStrategy.RetryLoop loop) {
+ if (loop.sleepCount() < MAX_RETRIES_ON_EXCEPTION) {
+ LOG.warn(
+ String.format("Fetching system stream metadata for: %s threw an exception. Retrying.", streamNames),
+ exception);
+ } else {
+ LOG.error(String.format("Fetching system stream metadata for: %s threw an exception.", streamNames),
+ exception);
+ loop.done();
+ throw new SamzaException(exception);
}
- };
+
+ return null;
+ }
+ };
Function0<Map<String, SystemStreamMetadata>> fallbackOperation =
- new AbstractFunction0<Map<String, SystemStreamMetadata>>() {
- @Override
- public Map<String, SystemStreamMetadata> apply() {
- throw new SamzaException("Failed to get system stream metadata");
- }
- };
+ new AbstractFunction0<Map<String, SystemStreamMetadata>>() {
+ @Override
+ public Map<String, SystemStreamMetadata> apply() {
+ throw new SamzaException("Failed to get system stream metadata");
+ }
+ };
return retryBackoff.run(fetchMetadataOperation, onExceptionRetryOperation).getOrElse(fallbackOperation);
}
@@ -486,16 +486,16 @@ public class KafkaSystemAdmin implements SystemAdmin {
topics.forEach(topic -> {
OffsetsMaps offsetsForTopic = threadSafeKafkaConsumer.execute(consumer -> {
- List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
- if (partitionInfos == null) {
- String msg = String.format("Partition info not(yet?) available for system %s topic %s", systemName, topic);
- throw new SamzaException(msg);
- }
- List<TopicPartition> topicPartitions = partitionInfos.stream()
- .map(partitionInfo -> new TopicPartition(partitionInfo.topic(), partitionInfo.partition()))
- .collect(Collectors.toList());
- return fetchTopicPartitionsMetadata(topicPartitions);
- });
+ List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
+ if (partitionInfos == null) {
+ String msg = String.format("Partition info not(yet?) available for system %s topic %s", systemName, topic);
+ throw new SamzaException(msg);
+ }
+ List<TopicPartition> topicPartitions = partitionInfos.stream()
+ .map(partitionInfo -> new TopicPartition(partitionInfo.topic(), partitionInfo.partition()))
+ .collect(Collectors.toList());
+ return fetchTopicPartitionsMetadata(topicPartitions);
+ });
allOldestOffsets.putAll(offsetsForTopic.getOldestOffsets());
allNewestOffsets.putAll(offsetsForTopic.getNewestOffsets());
allUpcomingOffsets.putAll(offsetsForTopic.getUpcomingOffsets());
@@ -516,7 +516,7 @@ public class KafkaSystemAdmin implements SystemAdmin {
@Override
public boolean createStream(StreamSpec streamSpec) {
LOG.info("Creating Kafka topic: {} on system: {}", streamSpec.getPhysicalName(), streamSpec.getSystemName());
- final String REPL_FACTOR = "replication.factor";
+ final String replFactor = "replication.factor";
KafkaStreamSpec kafkaStreamSpec = toKafkaSpec(streamSpec);
String topicName = kafkaStreamSpec.getPhysicalName();
@@ -527,11 +527,11 @@ public class KafkaSystemAdmin implements SystemAdmin {
// specify the configs
Map<String, String> streamConfig = new HashMap<>(kafkaStreamSpec.getConfig());
// HACK - replication.factor is invalid config for AdminClient.createTopics
- if (streamConfig.containsKey(REPL_FACTOR)) {
- String repl = streamConfig.get(REPL_FACTOR);
+ if (streamConfig.containsKey(replFactor)) {
+ String repl = streamConfig.get(replFactor);
LOG.warn("Configuration {}={} for topic={} is invalid. Using kSpec repl factor {}",
- REPL_FACTOR, repl, kafkaStreamSpec.getPhysicalName(), kafkaStreamSpec.getReplicationFactor());
- streamConfig.remove(REPL_FACTOR);
+ replFactor, repl, kafkaStreamSpec.getPhysicalName(), kafkaStreamSpec.getReplicationFactor());
+ streamConfig.remove(replFactor);
}
newTopic.configs(new MapConfig(streamConfig));
CreateTopicsResult result = adminClient.createTopics(ImmutableSet.of(newTopic));
@@ -653,8 +653,8 @@ public class KafkaSystemAdmin implements SystemAdmin {
Map<TopicPartition, RecordsToDelete> recordsToDelete = offsets.entrySet()
.stream()
.collect(Collectors.toMap(entry ->
- new TopicPartition(entry.getKey().getStream(), entry.getKey().getPartition().getPartitionId()),
- entry -> RecordsToDelete.beforeOffset(Long.parseLong(entry.getValue()) + 1)));
+ new TopicPartition(entry.getKey().getStream(), entry.getKey().getPartition().getPartitionId()),
+ entry -> RecordsToDelete.beforeOffset(Long.parseLong(entry.getValue()) + 1)));
adminClient.deleteRecords(recordsToDelete).all().whenComplete((ignored, exception) -> {
if (exception != null) {
diff --git a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java
index 3974e02..27bd638 100644
--- a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java
+++ b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java
@@ -1,6 +1,4 @@
-
/*
- *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -17,9 +15,7 @@
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
- *
*/
-
package org.apache.samza.system.kafka;
import java.util.HashMap;
@@ -158,7 +154,7 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
void startConsumer() {
// set the offset for each TopicPartition
if (topicPartitionsToOffset.size() <= 0) {
- LOG.error ("{}: Consumer is not subscribed to any SSPs", this);
+ LOG.error("{}: Consumer is not subscribed to any SSPs", this);
}
topicPartitionsToOffset.forEach((topicPartition, startingOffsetString) -> {
diff --git a/samza-kafka/src/main/java/org/apache/samza/system/kafka/descriptors/KafkaSystemDescriptor.java b/samza-kafka/src/main/java/org/apache/samza/system/kafka/descriptors/KafkaSystemDescriptor.java
index 8c4d48b..bf42320 100644
--- a/samza-kafka/src/main/java/org/apache/samza/system/kafka/descriptors/KafkaSystemDescriptor.java
+++ b/samza-kafka/src/main/java/org/apache/samza/system/kafka/descriptors/KafkaSystemDescriptor.java
@@ -228,7 +228,7 @@ public class KafkaSystemDescriptor extends SystemDescriptor<KafkaSystemDescripto
@Override
public Map<String, String> toConfig() {
Map<String, String> configs = new HashMap<>(super.toConfig());
- if(!consumerZkConnect.isEmpty()) {
+ if (!consumerZkConnect.isEmpty()) {
configs.put(String.format(CONSUMER_ZK_CONNECT_CONFIG_KEY, getSystemName()), String.join(",", consumerZkConnect));
}
consumerAutoOffsetResetOptional.ifPresent(consumerAutoOffsetReset ->
diff --git a/samza-kafka/src/test/java/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManagerJava.java b/samza-kafka/src/test/java/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManagerJava.java
index 4e52ced..1280e79 100644
--- a/samza-kafka/src/test/java/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManagerJava.java
+++ b/samza-kafka/src/test/java/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManagerJava.java
@@ -196,7 +196,7 @@ public class TestKafkaCheckpointManagerJava {
// mock out a consumer that returns ten checkpoint IMEs for the same ssp
List<List<IncomingMessageEnvelope>> pollOutputs = new ArrayList<>();
- for(int offset = oldestOffset; offset <= newestOffset; offset++) {
+ for (int offset = oldestOffset; offset <= newestOffset; offset++) {
pollOutputs.add(ImmutableList.of(newCheckpointEnvelope(TASK1, ssp, Integer.toString(offset))));
}
diff --git a/samza-kafka/src/test/java/org/apache/samza/system/kafka/MockKafkaProducer.java b/samza-kafka/src/test/java/org/apache/samza/system/kafka/MockKafkaProducer.java
index 95676b7..a28b23e 100644
--- a/samza-kafka/src/test/java/org/apache/samza/system/kafka/MockKafkaProducer.java
+++ b/samza-kafka/src/test/java/org/apache/samza/system/kafka/MockKafkaProducer.java
@@ -41,14 +41,13 @@ import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.test.TestUtils;
public class MockKafkaProducer implements Producer<byte[], byte[]> {
- private Cluster _cluster;
- private List<FutureTask<RecordMetadata>> _callbacksList = new ArrayList<FutureTask<RecordMetadata>>();
+ private Cluster cluster;
+ private List<FutureTask<RecordMetadata>> callbacksList = new ArrayList<FutureTask<RecordMetadata>>();
private boolean shouldBuffer = false;
private boolean errorNext = false;
private boolean errorInCallback = true;
@@ -70,7 +69,7 @@ public class MockKafkaProducer implements Producer<byte[], byte[]> {
* - "Offset" in RecordMetadata is not guranteed to be correct
*/
public MockKafkaProducer(int numNodes, String topicName, int numPartitions) {
- this._cluster = TestUtils.clusterWith(numNodes, topicName, numPartitions);
+ this.cluster = TestUtils.clusterWith(numNodes, topicName, numPartitions);
}
public void setShouldBuffer(boolean shouldBuffer) {
@@ -110,7 +109,7 @@ public class MockKafkaProducer implements Producer<byte[], byte[]> {
if (errorNext) {
if (!errorInCallback) {
this.errorNext = false;
- throw (RuntimeException)exception;
+ throw (RuntimeException) exception;
}
if (shouldBuffer) {
FutureTask<RecordMetadata> f = new FutureTask<RecordMetadata>(new Callable<RecordMetadata>() {
@@ -121,7 +120,7 @@ public class MockKafkaProducer implements Producer<byte[], byte[]> {
return getRecordMetadata(record);
}
});
- _callbacksList.add(f);
+ callbacksList.add(f);
this.errorNext = false;
return f;
} else {
@@ -141,7 +140,7 @@ public class MockKafkaProducer implements Producer<byte[], byte[]> {
return metadata;
}
});
- _callbacksList.add(f);
+ callbacksList.add(f);
return f;
} else {
int offset = msgsSent.incrementAndGet();
@@ -154,7 +153,7 @@ public class MockKafkaProducer implements Producer<byte[], byte[]> {
@Override
public List<PartitionInfo> partitionsFor(String topic) {
- return this._cluster.partitionsForTopic(topic);
+ return this.cluster.partitionsForTopic(topic);
}
@Override
@@ -187,7 +186,7 @@ public class MockKafkaProducer implements Producer<byte[], byte[]> {
return openCount;
}
- public synchronized void flush () {
+ public synchronized void flush() {
new FlushRunnable(0).run();
}
@@ -248,11 +247,11 @@ public class MockKafkaProducer implements Producer<byte[], byte[]> {
private static class FutureSuccess implements Future<RecordMetadata> {
private ProducerRecord record;
- private final RecordMetadata _metadata;
+ private final RecordMetadata metadata;
public FutureSuccess(ProducerRecord record, int offset) {
this.record = record;
- this._metadata = new RecordMetadata(new TopicPartition(record.topic(), record.partition() == null ? 0 : record.partition()), 0, offset, RecordBatch.NO_TIMESTAMP, -1L, -1, -1);
+ this.metadata = new RecordMetadata(new TopicPartition(record.topic(), record.partition() == null ? 0 : record.partition()), 0, offset, RecordBatch.NO_TIMESTAMP, -1L, -1, -1);
}
@Override
@@ -262,12 +261,12 @@ public class MockKafkaProducer implements Producer<byte[], byte[]> {
@Override
public RecordMetadata get() throws ExecutionException {
- return this._metadata;
+ return this.metadata;
}
@Override
public RecordMetadata get(long timeout, TimeUnit unit) throws ExecutionException {
- return this._metadata;
+ return this.metadata;
}
@Override
@@ -282,21 +281,21 @@ public class MockKafkaProducer implements Producer<byte[], byte[]> {
}
private class FlushRunnable implements Runnable {
- private final int _sleepTime;
+ private final int sleepTime;
public FlushRunnable(int sleepTime) {
- _sleepTime = sleepTime;
+ this.sleepTime = sleepTime;
}
public void run() {
- FutureTask[] callbackArray = new FutureTask[_callbacksList.size()];
- AtomicReferenceArray<FutureTask> _bufferList =
- new AtomicReferenceArray<FutureTask>(_callbacksList.toArray(callbackArray));
+ FutureTask[] callbackArray = new FutureTask[callbacksList.size()];
+ AtomicReferenceArray<FutureTask> bufferList =
+ new AtomicReferenceArray<FutureTask>(callbacksList.toArray(callbackArray));
ExecutorService executor = Executors.newFixedThreadPool(10);
try {
- for (int i = 0; i < _bufferList.length(); i++) {
- Thread.sleep(_sleepTime);
- FutureTask f = _bufferList.get(i);
+ for (int i = 0; i < bufferList.length(); i++) {
+ Thread.sleep(sleepTime);
+ FutureTask f = bufferList.get(i);
if (!f.isDone()) {
executor.submit(f).get();
}
diff --git a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaCheckpointManagerFactory.java b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaCheckpointManagerFactory.java
index 1846ea8..2494fcc 100644
--- a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaCheckpointManagerFactory.java
+++ b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaCheckpointManagerFactory.java
@@ -19,7 +19,6 @@
package org.apache.samza.system.kafka;
-import org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory;
import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.KafkaConfig;
import org.apache.samza.config.MapConfig;
diff --git a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaStreamSpec.java b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaStreamSpec.java
index 14d2fe6..8a48283 100644
--- a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaStreamSpec.java
+++ b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaStreamSpec.java
@@ -22,19 +22,19 @@ import com.google.common.collect.ImmutableMap;
import java.util.Map;
import java.util.Properties;
import org.apache.samza.system.StreamSpec;
-import org.apache.samza.util.TestStreamUtil;
import org.junit.Test;
import static org.junit.Assert.*;
/**
- * See also the general StreamSpec tests in {@link TestStreamUtil}
+ * See also the general StreamSpec tests in {@link org.apache.samza.util.TestStreamUtil}
*/
public class TestKafkaStreamSpec {
@Test
public void testUnsupportedConfigStrippedFromProperties() {
- StreamSpec original = new StreamSpec("dummyId","dummyPhysicalName", "dummySystemName", ImmutableMap.of("segment.bytes", "4", "replication.factor", "7"));
+ StreamSpec original = new StreamSpec("dummyId", "dummyPhysicalName", "dummySystemName",
+ ImmutableMap.of("segment.bytes", "4", "replication.factor", "7"));
// First verify the original
assertEquals("7", original.get("replication.factor"));
@@ -61,6 +61,6 @@ public class TestKafkaStreamSpec {
@Test(expected = IllegalArgumentException.class)
public void testInvalidPartitionCount() {
- new KafkaStreamSpec("dummyId","dummyPhysicalName", "dummySystemName", 0);
+ new KafkaStreamSpec("dummyId", "dummyPhysicalName", "dummySystemName", 0);
}
}
diff --git a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
index 82d635f..cb8f34d 100644
--- a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
+++ b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
@@ -22,12 +22,10 @@ package org.apache.samza.system.kafka;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
-import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.RandomStringUtils;
-import org.apache.kafka.clients.admin.DescribeConfigsResult;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -93,7 +91,7 @@ public class TestKafkaSystemAdminJava extends TestKafkaSystemAdmin {
configResourceConfigMap.values().forEach(configEntry -> {
configEntry.entries().forEach(config -> {
- kafkaTopicConfig.put(config.name(), config.value());
+ kafkaTopicConfig.put(config.name(), config.value());
});
});
@@ -229,7 +227,7 @@ public class TestKafkaSystemAdminJava extends TestKafkaSystemAdmin {
@Test
public void testCreateCoordinatorStreamWithSpecialCharsInTopicName() {
- final String STREAM = "test.coordinator_test.Stream";
+ final String stream = "test.coordinator_test.Stream";
Map<String, String> map = new HashMap<>();
map.put("job.coordinator.segment.bytes", "123");
@@ -239,14 +237,14 @@ public class TestKafkaSystemAdminJava extends TestKafkaSystemAdmin {
String.valueOf(coordReplicatonFactor));
KafkaSystemAdmin admin = Mockito.spy(createSystemAdmin(SYSTEM, map));
- StreamSpec spec = StreamSpec.createCoordinatorStreamSpec(STREAM, SYSTEM);
+ StreamSpec spec = StreamSpec.createCoordinatorStreamSpec(stream, SYSTEM);
Mockito.doAnswer(invocationOnMock -> {
StreamSpec internalSpec = (StreamSpec) invocationOnMock.callRealMethod();
assertTrue(internalSpec instanceof KafkaStreamSpec); // KafkaStreamSpec is used to carry replication factor
assertTrue(internalSpec.isCoordinatorStream());
assertEquals(SYSTEM, internalSpec.getSystemName());
- assertEquals(STREAM, internalSpec.getPhysicalName());
+ assertEquals(stream, internalSpec.getPhysicalName());
assertEquals(1, internalSpec.getPartitionCount());
Assert.assertEquals(coordReplicatonFactor, ((KafkaStreamSpec) internalSpec).getReplicationFactor());
Assert.assertEquals("123", ((KafkaStreamSpec) internalSpec).getProperties().getProperty("segment.bytes"));
@@ -272,16 +270,16 @@ public class TestKafkaSystemAdminJava extends TestKafkaSystemAdmin {
}
public void testCreateChangelogStreamHelp(final String topic) {
- final int PARTITIONS = 12;
- final int REP_FACTOR = 2;
+ final int partitions = 12;
+ final int repFactor = 2;
Map<String, String> map = new HashMap<>();
map.put(JobConfig.JOB_DEFAULT_SYSTEM, SYSTEM);
map.put(String.format("stores.%s.changelog", "fakeStore"), topic);
- map.put(String.format("stores.%s.changelog.replication.factor", "fakeStore"), String.valueOf(REP_FACTOR));
+ map.put(String.format("stores.%s.changelog.replication.factor", "fakeStore"), String.valueOf(repFactor));
map.put(String.format("stores.%s.changelog.kafka.segment.bytes", "fakeStore"), "139");
KafkaSystemAdmin admin = Mockito.spy(createSystemAdmin(SYSTEM, map));
- StreamSpec spec = StreamSpec.createChangeLogStreamSpec(topic, SYSTEM, PARTITIONS);
+ StreamSpec spec = StreamSpec.createChangeLogStreamSpec(topic, SYSTEM, partitions);
Mockito.doAnswer(invocationOnMock -> {
StreamSpec internalSpec = (StreamSpec) invocationOnMock.callRealMethod();
@@ -289,8 +287,8 @@ public class TestKafkaSystemAdminJava extends TestKafkaSystemAdmin {
assertTrue(internalSpec.isChangeLogStream());
assertEquals(SYSTEM, internalSpec.getSystemName());
assertEquals(topic, internalSpec.getPhysicalName());
- assertEquals(REP_FACTOR, ((KafkaStreamSpec) internalSpec).getReplicationFactor());
- assertEquals(PARTITIONS, internalSpec.getPartitionCount());
+ assertEquals(repFactor, ((KafkaStreamSpec) internalSpec).getReplicationFactor());
+ assertEquals(partitions, internalSpec.getPartitionCount());
assertEquals("139", ((KafkaStreamSpec) internalSpec).getProperties().getProperty("segment.bytes"));
assertEquals("compact", ((KafkaStreamSpec) internalSpec).getProperties().getProperty("cleanup.policy"));
@@ -366,7 +364,7 @@ public class TestKafkaSystemAdminJava extends TestKafkaSystemAdmin {
}
@Test
- public void testShouldAssembleMetadata () {
+ public void testShouldAssembleMetadata() {
Map<SystemStreamPartition, String> oldestOffsets = new ImmutableMap.Builder<SystemStreamPartition, String>()
.put(new SystemStreamPartition(SYSTEM, "stream1", new Partition(0)), "o1")
.put(new SystemStreamPartition(SYSTEM, "stream2", new Partition(0)), "o2")
diff --git a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminWithMock.java b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminWithMock.java
index 25cab8c..cd1e707 100644
--- a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminWithMock.java
+++ b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminWithMock.java
@@ -336,7 +336,7 @@ public class TestKafkaSystemAdminWithMock {
}
@Test(expected = SamzaException.class)
- public void testGetSSPMetadataShouldTerminateAfterFiniteRetriesOnException() throws Exception{
+ public void testGetSSPMetadataShouldTerminateAfterFiniteRetriesOnException() throws Exception {
SystemStreamPartition oneSSP = new SystemStreamPartition(TEST_SYSTEM, VALID_TOPIC, new Partition(0));
SystemStreamPartition otherSSP = new SystemStreamPartition(TEST_SYSTEM, "otherTopic", new Partition(1));
diff --git a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java
index f5b1e8e..dd20248 100644
--- a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java
+++ b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java
@@ -1,5 +1,4 @@
/*
- *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -16,9 +15,7 @@
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
- *
*/
-
package org.apache.samza.system.kafka;
import java.util.HashMap;
@@ -173,7 +170,7 @@ public class TestKafkaSystemConsumer {
int partitionsNum = 2;
int ime0Size = Integer.valueOf(FETCH_THRESHOLD_MSGS) / partitionsNum; // fake size, upto the limit
int ime1Size = Integer.valueOf(FETCH_THRESHOLD_MSGS) / partitionsNum - 100; // fake size, below the limit
- int ime11Size = 20;// event with the second message still below the size limit
+ int ime11Size = 20; // event with the second message still below the size limit
ByteArraySerializer bytesSerde = new ByteArraySerializer();
IncomingMessageEnvelope ime0 = new IncomingMessageEnvelope(ssp0, "0", bytesSerde.serialize("", "key0".getBytes()),
bytesSerde.serialize("", "value0".getBytes()), ime0Size);
diff --git a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumerMetrics.java b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumerMetrics.java
index 03b0564..5cc6f84 100644
--- a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumerMetrics.java
+++ b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumerMetrics.java
@@ -1,5 +1,4 @@
/*
- *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -16,9 +15,7 @@
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
- *
*/
-
package org.apache.samza.system.kafka;
import java.util.HashMap;
@@ -96,7 +93,7 @@ public class TestKafkaSystemConsumerMetrics {
protected static void validate(Map<String, Metric> metricMap, Map<String, String> expectedValues) {
// match the expected value, set in the test above, and the value in the metrics
- for(Map.Entry<String, String> e: expectedValues.entrySet()) {
+ for (Map.Entry<String, String> e : expectedValues.entrySet()) {
String metricName = e.getKey();
String expectedValue = e.getValue();
// get the metric from the registry
diff --git a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemProducerJava.java b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemProducerJava.java
index 7fc450d..73673d3 100644
--- a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemProducerJava.java
+++ b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemProducerJava.java
@@ -27,7 +27,6 @@ import org.apache.samza.util.ExponentialSleepStrategy;
import org.junit.Test;
import scala.runtime.AbstractFunction0;
-import static junit.framework.Assert.assertEquals;
import static junit.framework.Assert.assertTrue;
@@ -38,19 +37,19 @@ public class TestKafkaSystemProducerJava {
@Test
public void testInstantiateProducer() {
KafkaSystemProducer ksp = new KafkaSystemProducer("SysName", new ExponentialSleepStrategy(2.0, 200, 10000),
- new AbstractFunction0<Producer<byte[], byte[]>>() {
- @Override
- public Producer<byte[], byte[]> apply() {
- return new KafkaProducer<>(new HashMap<String, Object>());
- }
- }, new KafkaSystemProducerMetrics("SysName", new MetricsRegistryMap()), new AbstractFunction0<Object>() {
- @Override
- public Object apply() {
- return System.currentTimeMillis();
- }
- }, false);
+ new AbstractFunction0<Producer<byte[], byte[]>>() {
+ @Override
+ public Producer<byte[], byte[]> apply() {
+ return new KafkaProducer<>(new HashMap<String, Object>());
+ }
+ }, new KafkaSystemProducerMetrics("SysName", new MetricsRegistryMap()), new AbstractFunction0<Object>() {
+ @Override
+ public Object apply() {
+ return System.currentTimeMillis();
+ }
+ }, false);
long now = System.currentTimeMillis();
- assertTrue((Long)ksp.clock().apply() >= now);
+ assertTrue((Long) ksp.clock().apply() >= now);
}
}
diff --git a/samza-kafka/src/test/java/org/apache/samza/system/kafka/descriptors/TestKafkaInputDescriptor.java b/samza-kafka/src/test/java/org/apache/samza/system/kafka/descriptors/TestKafkaInputDescriptor.java
index 5bce72d..3992e90 100644
--- a/samza-kafka/src/test/java/org/apache/samza/system/kafka/descriptors/TestKafkaInputDescriptor.java
+++ b/samza-kafka/src/test/java/org/apache/samza/system/kafka/descriptors/TestKafkaInputDescriptor.java
@@ -25,12 +25,9 @@ import org.apache.samza.operators.KV;
import org.apache.samza.serializers.IntegerSerde;
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.StringSerde;
-import org.apache.samza.system.kafka.descriptors.KafkaInputDescriptor;
-import org.apache.samza.system.kafka.descriptors.KafkaSystemDescriptor;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
public class TestKafkaInputDescriptor {
@Test
@@ -42,7 +39,7 @@ public class TestKafkaInputDescriptor {
.withConsumerAutoOffsetReset("largest")
.withConsumerFetchMessageMaxBytes(1024 * 1024);
- Map<String, String> generatedConfigs = isd.toConfig();;
+ Map<String, String> generatedConfigs = isd.toConfig();
assertEquals("kafka", generatedConfigs.get("streams.input-stream.samza.system"));
assertEquals("largest", generatedConfigs.get("systems.kafka.streams.input-stream.consumer.auto.offset.reset"));
assertEquals("1048576", generatedConfigs.get("systems.kafka.streams.input-stream.consumer.fetch.message.max.bytes"));
diff --git a/samza-kafka/src/test/java/org/apache/samza/system/kafka/descriptors/TestKafkaSystemDescriptor.java b/samza-kafka/src/test/java/org/apache/samza/system/kafka/descriptors/TestKafkaSystemDescriptor.java
index 31469f8..2a31198 100644
--- a/samza-kafka/src/test/java/org/apache/samza/system/kafka/descriptors/TestKafkaSystemDescriptor.java
+++ b/samza-kafka/src/test/java/org/apache/samza/system/kafka/descriptors/TestKafkaSystemDescriptor.java
@@ -23,7 +23,6 @@ import com.google.common.collect.ImmutableMap;
import java.util.Map;
import org.apache.samza.system.SystemStreamMetadata;
-import org.apache.samza.system.kafka.descriptors.KafkaSystemDescriptor;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
@@ -37,7 +36,7 @@ public class TestKafkaSystemDescriptor {
.withProducerBootstrapServers(ImmutableList.of("localhost:567", "localhost:890"))
.withDefaultStreamOffsetDefault(SystemStreamMetadata.OffsetType.OLDEST)
.withConsumerAutoOffsetReset("smallest")
- .withConsumerFetchMessageMaxBytes(1024*1024)
+ .withConsumerFetchMessageMaxBytes(1024 * 1024)
.withSamzaFetchThreshold(10000)
.withSamzaFetchThresholdBytes(1024 * 1024)
.withConsumerConfigs(ImmutableMap.of("custom-consumer-config-key", "custom-consumer-config-value"))