You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2017/04/04 00:57:18 UTC
kafka git commit: KAFKA-4977: Fix findbugs issues in connect/runtime
Repository: kafka
Updated Branches:
refs/heads/trunk ca2979f84 -> f812a8fd9
KAFKA-4977: Fix findbugs issues in connect/runtime
Author: Colin P. Mccabe <cm...@confluent.io>
Reviewers: Konstantine Karantasis <ko...@confluent.io>, Ewen Cheslack-Postava <ew...@confluent.io>
Closes #2763 from cmccabe/KAFKA-4977
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f812a8fd
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f812a8fd
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f812a8fd
Branch: refs/heads/trunk
Commit: f812a8fd93c37cb27db07505dd5f8f29463906a1
Parents: ca2979f
Author: Colin P. Mccabe <cm...@confluent.io>
Authored: Mon Apr 3 17:57:12 2017 -0700
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Mon Apr 3 17:57:12 2017 -0700
----------------------------------------------------------------------
.../apache/kafka/connect/runtime/Worker.java | 6 ++--
.../runtime/distributed/DistributedHerder.java | 15 +++++++++
.../rest/entities/ConnectorStateInfo.java | 16 +++++++++
.../kafka/connect/tools/SchemaSourceTask.java | 6 ++--
.../kafka/connect/util/ConnectorTaskId.java | 2 +-
gradle/findbugs-exclude.xml | 34 ++++++++++++++++++++
6 files changed, 71 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/f812a8fd/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index 1801e1b..400ae08 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -102,9 +102,9 @@ public class Worker {
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
// These settings are designed to ensure there is no data loss. They *may* be overridden via configs passed to the
// worker, but this may compromise the delivery guarantees of Kafka Connect.
- producerProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, ((Integer) Integer.MAX_VALUE).toString());
- producerProps.put(ProducerConfig.RETRIES_CONFIG, ((Integer) Integer.MAX_VALUE).toString());
- producerProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, ((Long) Long.MAX_VALUE).toString());
+ producerProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, Integer.toString(Integer.MAX_VALUE));
+ producerProps.put(ProducerConfig.RETRIES_CONFIG, Integer.toString(Integer.MAX_VALUE));
+ producerProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, Long.toString(Long.MAX_VALUE));
producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
// User-specified overrides
http://git-wip-us.apache.org/repos/asf/kafka/blob/f812a8fd/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index cf30aca..e908d0b 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -54,6 +54,7 @@ import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.NoSuchElementException;
+import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentSkipListSet;
@@ -1116,6 +1117,20 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
final int cmp = Long.compare(at, o.at);
return cmp == 0 ? Long.compare(seq, o.seq) : cmp;
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (!(o instanceof HerderRequest))
+ return false;
+ HerderRequest other = (HerderRequest) o;
+ return compareTo(other) == 0;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(at, seq);
+ }
}
private static final Callback<Void> forwardErrorCallback(final Callback<?> callback) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/f812a8fd/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorStateInfo.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorStateInfo.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorStateInfo.java
index defe2bb..c19e20b 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorStateInfo.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorStateInfo.java
@@ -21,6 +21,7 @@ import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.List;
+import java.util.Objects;
public class ConnectorStateInfo {
@@ -103,6 +104,21 @@ public class ConnectorStateInfo {
public int compareTo(TaskState that) {
return Integer.compare(this.id, that.id);
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == this)
+ return true;
+ if (!(o instanceof TaskState))
+ return false;
+ TaskState other = (TaskState) o;
+ return compareTo(other) == 0;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(id);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f812a8fd/connect/runtime/src/main/java/org/apache/kafka/connect/tools/SchemaSourceTask.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/SchemaSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/SchemaSourceTask.java
index 2955fb5..6a51b52 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/SchemaSourceTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/SchemaSourceTask.java
@@ -159,9 +159,7 @@ public class SchemaSourceTask extends SourceTask {
count++;
return result;
} else {
- synchronized (this) {
- this.wait();
- }
+ throttler.throttle();
return new ArrayList<>();
}
}
@@ -170,4 +168,4 @@ public class SchemaSourceTask extends SourceTask {
public void stop() {
throttler.wakeup();
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f812a8fd/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectorTaskId.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectorTaskId.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectorTaskId.java
index b62f87c..03a51f2 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectorTaskId.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectorTaskId.java
@@ -79,6 +79,6 @@ public class ConnectorTaskId implements Serializable, Comparable<ConnectorTaskId
int connectorCmp = connector.compareTo(o.connector);
if (connectorCmp != 0)
return connectorCmp;
- return ((Integer) task).compareTo(o.task);
+ return Integer.compare(task, o.task);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f812a8fd/gradle/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/gradle/findbugs-exclude.xml b/gradle/findbugs-exclude.xml
index 932eb3e..833d5c6 100644
--- a/gradle/findbugs-exclude.xml
+++ b/gradle/findbugs-exclude.xml
@@ -61,4 +61,38 @@
benchmarking. -->
<Package name="org.apache.kafka.jmh.cache.generated"/>
</Match>
+
+ <Match>
+ <!-- Suppress warnings about comparing a config string to
+ ConfigDef.NO_DEFAULT_VALUE using object equality. This is intentional. -->
+ <Class name="org.apache.kafka.connect.runtime.AbstractHerder"/>
+ <Method name="convertConfigKey"/>
+ <Bug pattern="ES_COMPARING_STRINGS_WITH_EQ"/>
+ </Match>
+
+ <Match>
+ <!-- Suppress warnings about ignoring the return value of await.
+ This is done intentionally because we use other clues to determine
+ if the wait was cut short. -->
+ <Class name="org.apache.kafka.connect.runtime.WorkerSourceTask"/>
+ <Method name="execute"/>
+ <Bug pattern="RV_RETURN_VALUE_IGNORED"/>
+ </Match>
+
+ <Match>
+ <!-- Suppress some warnings about intentional switch statement fallthrough. -->
+ <Class name="org.apache.kafka.connect.runtime.WorkerConnector"/>
+ <Or>
+ <Method name="doStart"/>
+ <Method name="pause"/>
+ </Or>
+ <Bug pattern="SF_SWITCH_FALLTHROUGH"/>
+ </Match>
+
+ <Match>
+ <!-- Suppress some inconsistent synchronization warnings. TODO: fix these. See
+ KAFKA-4994. -->
+ <Class name="org.apache.kafka.connect.storage.OffsetStorageWriter"/>
+ <Bug pattern="IS2_INCONSISTENT_SYNC"/>
+ </Match>
</FindBugsFilter>