You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2017/04/04 00:57:18 UTC

kafka git commit: KAFKA-4977: Fix findbugs issues in connect/runtime

Repository: kafka
Updated Branches:
  refs/heads/trunk ca2979f84 -> f812a8fd9


KAFKA-4977: Fix findbugs issues in connect/runtime

Author: Colin P. Mccabe <cm...@confluent.io>

Reviewers: Konstantine Karantasis <ko...@confluent.io>, Ewen Cheslack-Postava <ew...@confluent.io>

Closes #2763 from cmccabe/KAFKA-4977


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f812a8fd
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f812a8fd
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f812a8fd

Branch: refs/heads/trunk
Commit: f812a8fd93c37cb27db07505dd5f8f29463906a1
Parents: ca2979f
Author: Colin P. Mccabe <cm...@confluent.io>
Authored: Mon Apr 3 17:57:12 2017 -0700
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Mon Apr 3 17:57:12 2017 -0700

----------------------------------------------------------------------
 .../apache/kafka/connect/runtime/Worker.java    |  6 ++--
 .../runtime/distributed/DistributedHerder.java  | 15 +++++++++
 .../rest/entities/ConnectorStateInfo.java       | 16 +++++++++
 .../kafka/connect/tools/SchemaSourceTask.java   |  6 ++--
 .../kafka/connect/util/ConnectorTaskId.java     |  2 +-
 gradle/findbugs-exclude.xml                     | 34 ++++++++++++++++++++
 6 files changed, 71 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/f812a8fd/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index 1801e1b..400ae08 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -102,9 +102,9 @@ public class Worker {
         producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
         // These settings are designed to ensure there is no data loss. They *may* be overridden via configs passed to the
         // worker, but this may compromise the delivery guarantees of Kafka Connect.
-        producerProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, ((Integer) Integer.MAX_VALUE).toString());
-        producerProps.put(ProducerConfig.RETRIES_CONFIG, ((Integer) Integer.MAX_VALUE).toString());
-        producerProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, ((Long) Long.MAX_VALUE).toString());
+        producerProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, Integer.toString(Integer.MAX_VALUE));
+        producerProps.put(ProducerConfig.RETRIES_CONFIG, Integer.toString(Integer.MAX_VALUE));
+        producerProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, Long.toString(Long.MAX_VALUE));
         producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
         producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
         // User-specified overrides

http://git-wip-us.apache.org/repos/asf/kafka/blob/f812a8fd/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index cf30aca..e908d0b 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -54,6 +54,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.NavigableSet;
 import java.util.NoSuchElementException;
+import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentSkipListSet;
@@ -1116,6 +1117,20 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
             final int cmp = Long.compare(at, o.at);
             return cmp == 0 ? Long.compare(seq, o.seq) : cmp;
         }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (!(o instanceof HerderRequest))
+                return false;
+            HerderRequest other = (HerderRequest) o;
+            return compareTo(other) == 0;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(at, seq);
+        }
     }
 
     private static final Callback<Void> forwardErrorCallback(final Callback<?> callback) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/f812a8fd/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorStateInfo.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorStateInfo.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorStateInfo.java
index defe2bb..c19e20b 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorStateInfo.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorStateInfo.java
@@ -21,6 +21,7 @@ import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonProperty;
 
 import java.util.List;
+import java.util.Objects;
 
 public class ConnectorStateInfo {
 
@@ -103,6 +104,21 @@ public class ConnectorStateInfo {
         public int compareTo(TaskState that) {
             return Integer.compare(this.id, that.id);
         }
+
+        @Override
+        public boolean equals(Object o) {
+            if (o == this)
+                return true;
+            if (!(o instanceof TaskState))
+                return false;
+            TaskState other = (TaskState) o;
+            return compareTo(other) == 0;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(id);
+        }
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f812a8fd/connect/runtime/src/main/java/org/apache/kafka/connect/tools/SchemaSourceTask.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/SchemaSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/SchemaSourceTask.java
index 2955fb5..6a51b52 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/SchemaSourceTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/SchemaSourceTask.java
@@ -159,9 +159,7 @@ public class SchemaSourceTask extends SourceTask {
             count++;
             return result;
         } else {
-            synchronized (this) {
-                this.wait();
-            }
+            throttler.throttle();
             return new ArrayList<>();
         }
     }
@@ -170,4 +168,4 @@ public class SchemaSourceTask extends SourceTask {
     public void stop() {
         throttler.wakeup();
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f812a8fd/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectorTaskId.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectorTaskId.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectorTaskId.java
index b62f87c..03a51f2 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectorTaskId.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectorTaskId.java
@@ -79,6 +79,6 @@ public class ConnectorTaskId implements Serializable, Comparable<ConnectorTaskId
         int connectorCmp = connector.compareTo(o.connector);
         if (connectorCmp != 0)
             return connectorCmp;
-        return ((Integer) task).compareTo(o.task);
+        return Integer.compare(task, o.task);
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f812a8fd/gradle/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/gradle/findbugs-exclude.xml b/gradle/findbugs-exclude.xml
index 932eb3e..833d5c6 100644
--- a/gradle/findbugs-exclude.xml
+++ b/gradle/findbugs-exclude.xml
@@ -61,4 +61,38 @@
              benchmarking. -->
         <Package name="org.apache.kafka.jmh.cache.generated"/>
     </Match>
+
+    <Match>
+        <!-- Suppress warnings about comparing a config string to
+             ConfigDef.NO_DEFAULT_VALUE using object equality.  This is intentional. -->
+        <Class name="org.apache.kafka.connect.runtime.AbstractHerder"/>
+        <Method name="convertConfigKey"/>
+        <Bug pattern="ES_COMPARING_STRINGS_WITH_EQ"/>
+    </Match>
+
+    <Match>
+        <!-- Suppress warnings about ignoring the return value of await.
+             This is done intentionally because we use other clues to determine
+             if the wait was cut short. -->
+        <Class name="org.apache.kafka.connect.runtime.WorkerSourceTask"/>
+        <Method name="execute"/>
+        <Bug pattern="RV_RETURN_VALUE_IGNORED"/>
+    </Match>
+
+    <Match>
+        <!-- Suppress some warnings about intentional switch statement fallthrough. -->
+        <Class name="org.apache.kafka.connect.runtime.WorkerConnector"/>
+        <Or>
+            <Method name="doStart"/>
+            <Method name="pause"/>
+        </Or>
+        <Bug pattern="SF_SWITCH_FALLTHROUGH"/>
+    </Match>
+
+    <Match>
+        <!-- Suppress some inconsistent synchronization warnings.  TODO: fix these.  See
+             KAFKA-4994. -->
+        <Class name="org.apache.kafka.connect.storage.OffsetStorageWriter"/>
+        <Bug pattern="IS2_INCONSISTENT_SYNC"/>
+    </Match>
 </FindBugsFilter>