You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/02/25 21:49:00 UTC

[jira] [Commented] (KAFKA-4922) Fix several FindBugs warnings in Clients and Connect

    [ https://issues.apache.org/jira/browse/KAFKA-4922?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16376265#comment-16376265 ] 

ASF GitHub Bot commented on KAFKA-4922:
---------------------------------------

hachikuji closed pull request #2710: KAFKA-4922: - Minor FindBugs warning fixes
URL: https://github.com/apache/kafka/pull/2710
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Bytes.java b/clients/src/main/java/org/apache/kafka/common/utils/Bytes.java
index 78340e54dae..43ea712043a 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/Bytes.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Bytes.java
@@ -73,6 +73,9 @@ public int hashCode() {
 
     @Override
     public boolean equals(Object other) {
+		if (other == null)
+			return false;
+		
         if (this == other)
             return true;
 
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/data/Decimal.java b/connect/api/src/main/java/org/apache/kafka/connect/data/Decimal.java
index e15f69836b0..75b21c92450 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/data/Decimal.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/data/Decimal.java
@@ -51,8 +51,8 @@
     public static SchemaBuilder builder(int scale) {
         return SchemaBuilder.bytes()
                 .name(LOGICAL_NAME)
-                .parameter(SCALE_FIELD, ((Integer) scale).toString())
-                .version(1);
+                .parameter(SCALE_FIELD, Integer.toString(scale))
+                .version(1); 
     }
 
     public static Schema schema(int scale) {
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index 3703ed958e5..91c9a37b88a 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -103,8 +103,8 @@ public Worker(String workerId, Time time, ConnectorFactory connectorFactory, Wor
         producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
         // These settings are designed to ensure there is no data loss. They *may* be overridden via configs passed to the
         // worker, but this may compromise the delivery guarantees of Kafka Connect.
-        producerProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, ((Integer) Integer.MAX_VALUE).toString());
-        producerProps.put(ProducerConfig.RETRIES_CONFIG, ((Integer) Integer.MAX_VALUE).toString());
+        producerProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, Integer.toString(Integer.MAX_VALUE)); 
+        producerProps.put(ProducerConfig.RETRIES_CONFIG, Integer.toString(Integer.MAX_VALUE));
         producerProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, ((Long) Long.MAX_VALUE).toString());
         producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
         producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectorTaskId.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectorTaskId.java
index 8176d82322f..020b327cb87 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectorTaskId.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectorTaskId.java
@@ -80,6 +80,6 @@ public int compareTo(ConnectorTaskId o) {
         int connectorCmp = connector.compareTo(o.connector);
         if (connectorCmp != 0)
             return connectorCmp;
-        return ((Integer) task).compareTo(o.task);
+        return Integer.compare(task, o.task);
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index 21135fb23b4..64187e724dc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -405,6 +405,20 @@ void writeAsText(final String filePath,
      */
     void foreach(final ForeachAction<? super K, ? super V> action);
 
+    /**
+     * Perform an action on each record of {@code KStream}.
+     * This is a stateless record-by-record operation (cf. {@link #process(ProcessorSupplier, String...)}).
+     * <p>
+     * Peek is a non-terminal operation that triggers a side effect (such as logging or statistics collection)
+     * and returns an unchanged stream.
+     * <p>
+     * Note that since this operation is stateless, it may execute multiple times for a single record in failure cases.
+     *
+     * @param action an action to perform on each record
+     * @see #process(ProcessorSupplier, String...)
+     */
+    KStream<K, V> peek(final ForeachAction<? super K, ? super V> action);
+
     /**
      * Creates an array of {@code KStream} from this stream by branching the records in the original stream based on
      * the supplied predicates.
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 0434f06c7fe..f325dcfaa55 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -57,6 +57,8 @@
 
     public static final String FILTER_NAME = "KSTREAM-FILTER-";
 
+    public static final String PEEK_NAME = "KSTREAM-PEEK-";
+
     private static final String FLATMAP_NAME = "KSTREAM-FLATMAP-";
 
     private static final String FLATMAPVALUES_NAME = "KSTREAM-FLATMAPVALUES-";
@@ -317,6 +319,16 @@ public void foreach(ForeachAction<? super K, ? super V> action) {
         topology.addProcessor(name, new KStreamForeach<>(action), this.name);
     }
 
+    @Override
+    public KStream<K, V> peek(final ForeachAction<? super K, ? super V> action) {
+        Objects.requireNonNull(action, "action can't be null");
+        final String name = topology.newName(PEEK_NAME);
+
+        topology.addProcessor(name, new KStreamPeek<>(action), this.name);
+
+        return new KStreamImpl<>(topology, name, sourceNodes, repartitionRequired);
+    }
+
     @Override
     public KStream<K, V> through(Serde<K> keySerde, Serde<V> valSerde, String topic) {
         return through(keySerde, valSerde, null, topic);
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPeek.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPeek.java
new file mode 100644
index 00000000000..3dc05131316
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPeek.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.kstream.ForeachAction;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+
+class KStreamPeek<K, V> implements ProcessorSupplier<K, V> {
+
+    private final ForeachAction<K, V> action;
+
+    public KStreamPeek(final ForeachAction<K, V> action) {
+        this.action = action;
+    }
+
+    @Override
+    public Processor<K, V> get() {
+        return new KStreamPeekProcessor();
+    }
+
+    private class KStreamPeekProcessor extends AbstractProcessor<K, V> {
+        @Override
+        public void process(final K key, final V value) {
+            action.apply(key, value);
+            context().forward(key, value);
+        }
+    }
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPeekTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPeekTest.java
new file mode 100644
index 00000000000..48f4b653b8a
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPeekTest.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.ForeachAction;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.test.KStreamTestDriver;
+import org.junit.After;
+import org.junit.Test;
+
+public class KStreamPeekTest {
+
+    private final String topicName = "topic";
+
+    private KStreamTestDriver driver = null;
+
+    @After
+    public void cleanup() {
+        if (driver != null) {
+            driver.close();
+        }
+    }
+
+    @Test
+    public void shouldObserveStreamElements() {
+        final KStreamBuilder builder = new KStreamBuilder();
+        final KStream<Integer, String> stream = builder.stream(Serdes.Integer(), Serdes.String(), topicName);
+        final List<KeyValue<Integer, String>> peekObserved = new ArrayList<>(), streamObserved = new ArrayList<>();
+        stream.peek(collect(peekObserved)).foreach(collect(streamObserved));
+
+        driver = new KStreamTestDriver(builder);
+        final List<KeyValue<Integer, String>> expected = new ArrayList<>();
+        for (int key = 0; key < 32; key++) {
+            final String value = "V" + key;
+            driver.process(topicName, key, value);
+            expected.add(new KeyValue<>(key, value));
+        }
+
+        assertEquals(expected, peekObserved);
+        assertEquals(expected, streamObserved);
+    }
+
+    @Test
+    public void shouldNotAllowNullAction() {
+        final KStreamBuilder builder = new KStreamBuilder();
+        final KStream<Integer, String> stream = builder.stream(Serdes.Integer(), Serdes.String(), topicName);
+        try {
+            stream.peek(null);
+            fail("expected null action to throw NPE");
+        } catch (NullPointerException expected) { }
+    }
+
+    private static <K, V> ForeachAction<K, V> collect(final List<KeyValue<K, V>> into) {
+        return new ForeachAction<K, V>() {
+            @Override
+            public void apply(final K key, final V value) {
+                into.add(new KeyValue<>(key, value));
+            }
+        };
+    }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Fix several FindBugs warnings in Clients and Connect
> ----------------------------------------------------
>
>                 Key: KAFKA-4922
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4922
>             Project: Kafka
>          Issue Type: Improvement
>          Components: clients, KafkaConnect
>    Affects Versions: 0.10.2.0
>         Environment: Identified by FindBugs, non-software and -platform specific
>            Reporter: Daan Rennings
>            Priority: Minor
>              Labels: newbie
>             Fix For: 0.11.0.0
>
>         Attachments: ClientsFindBugsReport.html, ConnectAPIFindBugsReport.html, ConnectRuntimeFindBugsReport.html
>
>
> Four easy to fix warnings (not a complete set of the current FindBugs warnings) as identified by FindBugs and stated in the attached reports:
> -org.apache.kafka.common.utils.Bytes.equals(Object) does not check for null argument (Low priority, Bad Practice)
> -Primitive boxed just to call toString in org.apache.kafka.connect.data.Decimal.builder(int) (High Priority, Performance)
> -Primitive boxed just to call toString in new org.apache.kafka.connect.runtime.Worker(String, Time, ConnectorFactory, WorkerConfig, OffsetBackingStore) (High Priority, Performance)
> -Primitive is boxed to call Integer.compareTo(Integer): use Integer.compare(int, int) instead (High Priority, Performance)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)