You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/04/06 22:59:23 UTC
kafka git commit: KAFKA-4995;
Fix remaining findbugs warnings in Kafka Streams
Repository: kafka
Updated Branches:
refs/heads/trunk 76a10e23f -> 2f4f3b957
KAFKA-4995; Fix remaining findbugs warnings in Kafka Streams
Author: Colin P. Mccabe <cm...@confluent.io>
Reviewers: Eno Thereska <en...@confluent.io>, Matthias J. Sax <ma...@confluent.io>, Ismael Juma <is...@juma.me.uk>
Closes #2780 from cmccabe/KAFKA-4995
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2f4f3b95
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2f4f3b95
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2f4f3b95
Branch: refs/heads/trunk
Commit: 2f4f3b957d753debd8d10c98a2c867211064503e
Parents: 76a10e2
Author: Colin P. Mccabe <cm...@confluent.io>
Authored: Thu Apr 6 23:59:16 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Thu Apr 6 23:59:16 2017 +0100
----------------------------------------------------------------------
gradle/findbugs-exclude.xml | 24 +++++++++++++++++
.../kstream/internals/KStreamWindowReduce.java | 6 ++---
.../streams/processor/TopologyBuilder.java | 28 +++++++++-----------
.../internals/ProcessorStateManager.java | 5 ++--
.../internals/StreamPartitionAssignor.java | 15 +++++++++++
.../processor/internals/StreamThread.java | 13 ++++-----
.../streams/state/internals/MemoryLRUCache.java | 2 +-
7 files changed, 65 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/2f4f3b95/gradle/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/gradle/findbugs-exclude.xml b/gradle/findbugs-exclude.xml
index cdb6894..0a64e5f 100644
--- a/gradle/findbugs-exclude.xml
+++ b/gradle/findbugs-exclude.xml
@@ -145,4 +145,28 @@
<Class name="org.apache.kafka.clients.consumer.internals.AbstractCoordinator"/>
<Bug pattern="IS2_INCONSISTENT_SYNC"/>
</Match>
+
+ <Match>
+ <!-- Suppress warnings about implementing compareTo but not equals. -->
+ <Class name="org.apache.kafka.streams.processor.internals.Stamped"/>
+ <Bug pattern="EQ_COMPARETO_USE_OBJECT_EQUALS"/>
+ </Match>
+
+ <Match>
+ <!-- TODO: fix this (see KAFKA-4996) -->
+ <Class name="org.apache.kafka.streams.state.internals.Segments"/>
+ <Method name="getOrCreateSegment"/>
+ <Bug pattern="AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION"/>
+ </Match>
+
+ <Match>
+ <!-- TODO: fix this (see KAFKA-4996) -->
+ <Or>
+ <Package name="org.apache.kafka.streams.state.internals"/>
+ <Package name="org.apache.kafka.streams.processor.internals"/>
+ <Package name="org.apache.kafka.streams.processor"/>
+ <Package name="org.apache.kafka.streams"/>
+ </Or>
+ <Bug pattern="IS2_INCONSISTENT_SYNC"/>
+ </Match>
</FindBugsFilter>
http://git-wip-us.apache.org/repos/asf/kafka/blob/2f4f3b95/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
index 3f4442c..c20601a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
@@ -114,9 +114,9 @@ public class KStreamWindowReduce<K, V, W extends Window> implements KStreamAggPr
}
// create the new window for the rest of unmatched window that do not exist yet
- for (long windowStartMs : matchedWindows.keySet()) {
- windowStore.put(key, value, windowStartMs);
- tupleForwarder.maybeForward(new Windowed<>(key, matchedWindows.get(windowStartMs)), value, null);
+ for (final Map.Entry<Long, W> entry : matchedWindows.entrySet()) {
+ windowStore.put(key, value, entry.getKey());
+ tupleForwarder.maybeForward(new Windowed<>(key, entry.getValue()), value, null);
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2f4f3b95/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
index 7c2ec4f..8b55bc6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
@@ -689,14 +689,12 @@ public class TopologyBuilder {
if (nodeFactories.containsKey(name))
throw new TopologyBuilderException("Processor " + name + " is already added.");
- if (parentNames != null) {
- for (String parent : parentNames) {
- if (parent.equals(name)) {
- throw new TopologyBuilderException("Processor " + name + " cannot be a parent of itself.");
- }
- if (!nodeFactories.containsKey(parent)) {
- throw new TopologyBuilderException("Parent processor " + parent + " is not added yet.");
- }
+ for (final String parent : parentNames) {
+ if (parent.equals(name)) {
+ throw new TopologyBuilderException("Processor " + name + " cannot be a parent of itself.");
+ }
+ if (!nodeFactories.containsKey(parent)) {
+ throw new TopologyBuilderException("Parent processor " + parent + " is not added yet.");
}
}
@@ -723,14 +721,12 @@ public class TopologyBuilder {
if (nodeFactories.containsKey(name))
throw new TopologyBuilderException("Processor " + name + " is already added.");
- if (parentNames != null) {
- for (String parent : parentNames) {
- if (parent.equals(name)) {
- throw new TopologyBuilderException("Processor " + name + " cannot be a parent of itself.");
- }
- if (!nodeFactories.containsKey(parent)) {
- throw new TopologyBuilderException("Parent processor " + parent + " is not added yet.");
- }
+ for (final String parent : parentNames) {
+ if (parent.equals(name)) {
+ throw new TopologyBuilderException("Processor " + name + " cannot be a parent of itself.");
+ }
+ if (!nodeFactories.containsKey(parent)) {
+ throw new TopologyBuilderException("Parent processor " + parent + " is not added yet.");
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2f4f3b95/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 0e48ddd..4b7bb1f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -273,10 +273,11 @@ public class ProcessorStateManager implements StateManager {
@Override
public void checkpoint(final Map<TopicPartition, Long> ackedOffsets) {
checkpointedOffsets.putAll(changelogReader.restoredOffsets());
- for (String storeName : stores.keySet()) {
+ for (final Map.Entry<String, StateStore> entry : stores.entrySet()) {
+ String storeName = entry.getKey();
// only checkpoint the offset to the offsets file if
// it is persistent AND changelog enabled
- if (stores.get(storeName).persistent() && storeToChangelogTopic.containsKey(storeName)) {
+ if (entry.getValue().persistent() && storeToChangelogTopic.containsKey(storeName)) {
final String changelogTopic = storeToChangelogTopic.get(storeName);
final TopicPartition topicPartition = new TopicPartition(changelogTopic, getPartition(storeName));
if (ackedOffsets.containsKey(topicPartition)) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/2f4f3b95/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
index 004926f..5c24801 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -75,6 +75,21 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
public int compareTo(final AssignedPartition that) {
return PARTITION_COMPARATOR.compare(this.partition, that.partition);
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof AssignedPartition)) {
+ return false;
+ }
+ AssignedPartition other = (AssignedPartition) o;
+ return compareTo(other) == 0;
+ }
+
+ @Override
+ public int hashCode() {
+ // Only partition is important for compareTo, equals and hashCode.
+ return partition.hashCode();
+ }
}
private static class ClientMetadata {
http://git-wip-us.apache.org/repos/asf/kafka/blob/2f4f3b95/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index c700cad..03a9789 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -726,8 +726,10 @@ public class StreamThread extends Thread {
if (!standbyRecords.isEmpty()) {
Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> remainingStandbyRecords = new HashMap<>();
- for (TopicPartition partition : standbyRecords.keySet()) {
- List<ConsumerRecord<byte[], byte[]>> remaining = standbyRecords.get(partition);
+ for (final Map.Entry<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> entry :
+ standbyRecords.entrySet()) {
+ TopicPartition partition = entry.getKey();
+ List<ConsumerRecord<byte[], byte[]>> remaining = entry.getValue();
if (remaining != null) {
StandbyTask task = standbyTasksByPartition.get(partition);
remaining = task.update(partition, remaining);
@@ -1151,8 +1153,8 @@ public class StreamThread extends Thread {
// iterate and print active tasks
if (activeTasks != null) {
sb.append(indent).append("\tActive tasks:\n");
- for (TaskId tId : activeTasks.keySet()) {
- StreamTask task = activeTasks.get(tId);
+ for (final Map.Entry<TaskId, StreamTask> entry : activeTasks.entrySet()) {
+ StreamTask task = entry.getValue();
sb.append(indent).append(task.toString(indent + "\t\t"));
}
}
@@ -1160,8 +1162,7 @@ public class StreamThread extends Thread {
// iterate and print standby tasks
if (standbyTasks != null) {
sb.append(indent).append("\tStandby tasks:\n");
- for (TaskId tId : standbyTasks.keySet()) {
- StandbyTask task = standbyTasks.get(tId);
+ for (StandbyTask task : standbyTasks.values()) {
sb.append(indent).append(task.toString(indent + "\t\t"));
}
sb.append("\n");
http://git-wip-us.apache.org/repos/asf/kafka/blob/2f4f3b95/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
index e6bba54..6429f62 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
@@ -75,7 +75,7 @@ public class MemoryLRUCache<K, V> implements KeyValueStore<K, V> {
@Override
protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
- if (size() > maxCacheSize) {
+ if (super.size() > maxCacheSize) {
K key = eldest.getKey();
if (listener != null) listener.apply(key, eldest.getValue());
return true;