You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/04/06 22:59:23 UTC

kafka git commit: KAFKA-4995; Fix remaining findbugs warnings in Kafka Streams

Repository: kafka
Updated Branches:
  refs/heads/trunk 76a10e23f -> 2f4f3b957


KAFKA-4995; Fix remaining findbugs warnings in Kafka Streams

Author: Colin P. Mccabe <cm...@confluent.io>

Reviewers: Eno Thereska <en...@confluent.io>, Matthias J. Sax <ma...@confluent.io>, Ismael Juma <is...@juma.me.uk>

Closes #2780 from cmccabe/KAFKA-4995


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2f4f3b95
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2f4f3b95
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2f4f3b95

Branch: refs/heads/trunk
Commit: 2f4f3b957d753debd8d10c98a2c867211064503e
Parents: 76a10e2
Author: Colin P. Mccabe <cm...@confluent.io>
Authored: Thu Apr 6 23:59:16 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Thu Apr 6 23:59:16 2017 +0100

----------------------------------------------------------------------
 gradle/findbugs-exclude.xml                     | 24 +++++++++++++++++
 .../kstream/internals/KStreamWindowReduce.java  |  6 ++---
 .../streams/processor/TopologyBuilder.java      | 28 +++++++++-----------
 .../internals/ProcessorStateManager.java        |  5 ++--
 .../internals/StreamPartitionAssignor.java      | 15 +++++++++++
 .../processor/internals/StreamThread.java       | 13 ++++-----
 .../streams/state/internals/MemoryLRUCache.java |  2 +-
 7 files changed, 65 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/2f4f3b95/gradle/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/gradle/findbugs-exclude.xml b/gradle/findbugs-exclude.xml
index cdb6894..0a64e5f 100644
--- a/gradle/findbugs-exclude.xml
+++ b/gradle/findbugs-exclude.xml
@@ -145,4 +145,28 @@
         <Class name="org.apache.kafka.clients.consumer.internals.AbstractCoordinator"/>
         <Bug pattern="IS2_INCONSISTENT_SYNC"/>
     </Match>
+
+    <Match>
+        <!-- Suppress warnings about implementing compareTo but not equals. -->
+        <Class name="org.apache.kafka.streams.processor.internals.Stamped"/>
+        <Bug pattern="EQ_COMPARETO_USE_OBJECT_EQUALS"/>
+    </Match>
+
+    <Match>
+        <!-- TODO: fix this (see KAFKA-4996) -->
+        <Class name="org.apache.kafka.streams.state.internals.Segments"/>
+        <Method name="getOrCreateSegment"/>
+        <Bug pattern="AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION"/>
+    </Match>
+
+    <Match>
+        <!-- TODO: fix this (see KAFKA-4996) -->
+        <Or>
+            <Package name="org.apache.kafka.streams.state.internals"/>
+            <Package name="org.apache.kafka.streams.processor.internals"/>
+            <Package name="org.apache.kafka.streams.processor"/>
+            <Package name="org.apache.kafka.streams"/>
+        </Or>
+        <Bug pattern="IS2_INCONSISTENT_SYNC"/>
+    </Match>
 </FindBugsFilter>

http://git-wip-us.apache.org/repos/asf/kafka/blob/2f4f3b95/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
index 3f4442c..c20601a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
@@ -114,9 +114,9 @@ public class KStreamWindowReduce<K, V, W extends Window> implements KStreamAggPr
             }
 
             // create the new window for the rest of unmatched window that do not exist yet
-            for (long windowStartMs : matchedWindows.keySet()) {
-                windowStore.put(key, value, windowStartMs);
-                tupleForwarder.maybeForward(new Windowed<>(key, matchedWindows.get(windowStartMs)), value, null);
+            for (final Map.Entry<Long, W> entry : matchedWindows.entrySet()) {
+                windowStore.put(key, value, entry.getKey());
+                tupleForwarder.maybeForward(new Windowed<>(key, entry.getValue()), value, null);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/2f4f3b95/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
index 7c2ec4f..8b55bc6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
@@ -689,14 +689,12 @@ public class TopologyBuilder {
         if (nodeFactories.containsKey(name))
             throw new TopologyBuilderException("Processor " + name + " is already added.");
 
-        if (parentNames != null) {
-            for (String parent : parentNames) {
-                if (parent.equals(name)) {
-                    throw new TopologyBuilderException("Processor " + name + " cannot be a parent of itself.");
-                }
-                if (!nodeFactories.containsKey(parent)) {
-                    throw new TopologyBuilderException("Parent processor " + parent + " is not added yet.");
-                }
+        for (final String parent : parentNames) {
+            if (parent.equals(name)) {
+                throw new TopologyBuilderException("Processor " + name + " cannot be a parent of itself.");
+            }
+            if (!nodeFactories.containsKey(parent)) {
+                throw new TopologyBuilderException("Parent processor " + parent + " is not added yet.");
             }
         }
 
@@ -723,14 +721,12 @@ public class TopologyBuilder {
         if (nodeFactories.containsKey(name))
             throw new TopologyBuilderException("Processor " + name + " is already added.");
 
-        if (parentNames != null) {
-            for (String parent : parentNames) {
-                if (parent.equals(name)) {
-                    throw new TopologyBuilderException("Processor " + name + " cannot be a parent of itself.");
-                }
-                if (!nodeFactories.containsKey(parent)) {
-                    throw new TopologyBuilderException("Parent processor " + parent + " is not added yet.");
-                }
+        for (final String parent : parentNames) {
+            if (parent.equals(name)) {
+                throw new TopologyBuilderException("Processor " + name + " cannot be a parent of itself.");
+            }
+            if (!nodeFactories.containsKey(parent)) {
+                throw new TopologyBuilderException("Parent processor " + parent + " is not added yet.");
             }
         }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/2f4f3b95/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 0e48ddd..4b7bb1f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -273,10 +273,11 @@ public class ProcessorStateManager implements StateManager {
     @Override
     public void checkpoint(final Map<TopicPartition, Long> ackedOffsets) {
         checkpointedOffsets.putAll(changelogReader.restoredOffsets());
-        for (String storeName : stores.keySet()) {
+        for (final Map.Entry<String, StateStore> entry : stores.entrySet()) {
+            String storeName = entry.getKey();
             // only checkpoint the offset to the offsets file if
             // it is persistent AND changelog enabled
-            if (stores.get(storeName).persistent() && storeToChangelogTopic.containsKey(storeName)) {
+            if (entry.getValue().persistent() && storeToChangelogTopic.containsKey(storeName)) {
                 final String changelogTopic = storeToChangelogTopic.get(storeName);
                 final TopicPartition topicPartition = new TopicPartition(changelogTopic, getPartition(storeName));
                 if (ackedOffsets.containsKey(topicPartition)) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/2f4f3b95/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
index 004926f..5c24801 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -75,6 +75,21 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
         public int compareTo(final AssignedPartition that) {
             return PARTITION_COMPARATOR.compare(this.partition, that.partition);
         }
+
+        @Override
+        public boolean equals(Object o) {
+            if (!(o instanceof AssignedPartition)) {
+                return false;
+            }
+            AssignedPartition other = (AssignedPartition) o;
+            return compareTo(other) == 0;
+        }
+
+        @Override
+        public int hashCode() {
+            // Only partition is important for compareTo, equals and hashCode.
+            return partition.hashCode();
+        }
     }
 
     private static class ClientMetadata {

http://git-wip-us.apache.org/repos/asf/kafka/blob/2f4f3b95/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index c700cad..03a9789 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -726,8 +726,10 @@ public class StreamThread extends Thread {
                 if (!standbyRecords.isEmpty()) {
                     Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> remainingStandbyRecords = new HashMap<>();
 
-                    for (TopicPartition partition : standbyRecords.keySet()) {
-                        List<ConsumerRecord<byte[], byte[]>> remaining = standbyRecords.get(partition);
+                    for (final Map.Entry<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> entry :
+                            standbyRecords.entrySet()) {
+                        TopicPartition partition = entry.getKey();
+                        List<ConsumerRecord<byte[], byte[]>> remaining = entry.getValue();
                         if (remaining != null) {
                             StandbyTask task = standbyTasksByPartition.get(partition);
                             remaining = task.update(partition, remaining);
@@ -1151,8 +1153,8 @@ public class StreamThread extends Thread {
         // iterate and print active tasks
         if (activeTasks != null) {
             sb.append(indent).append("\tActive tasks:\n");
-            for (TaskId tId : activeTasks.keySet()) {
-                StreamTask task = activeTasks.get(tId);
+            for (final Map.Entry<TaskId, StreamTask> entry : activeTasks.entrySet()) {
+                StreamTask task = entry.getValue();
                 sb.append(indent).append(task.toString(indent + "\t\t"));
             }
         }
@@ -1160,8 +1162,7 @@ public class StreamThread extends Thread {
         // iterate and print standby tasks
         if (standbyTasks != null) {
             sb.append(indent).append("\tStandby tasks:\n");
-            for (TaskId tId : standbyTasks.keySet()) {
-                StandbyTask task = standbyTasks.get(tId);
+            for (StandbyTask task : standbyTasks.values()) {
                 sb.append(indent).append(task.toString(indent + "\t\t"));
             }
             sb.append("\n");

http://git-wip-us.apache.org/repos/asf/kafka/blob/2f4f3b95/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
index e6bba54..6429f62 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
@@ -75,7 +75,7 @@ public class MemoryLRUCache<K, V> implements KeyValueStore<K, V> {
 
             @Override
             protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
-                if (size() > maxCacheSize) {
+                if (super.size() > maxCacheSize) {
                     K key = eldest.getKey();
                     if (listener != null) listener.apply(key, eldest.getValue());
                     return true;