You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/06/25 18:30:33 UTC

[GitHub] [kafka] lct45 opened a new pull request #8929: KAFKA-4996: Fix findbugs multithreaded correctness warnings for streams

lct45 opened a new pull request #8929:
URL: https://github.com/apache/kafka/pull/8929


   Fix findbugs multithreaded correctness warnings for streams, updated variables to be threadsafe
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] lct45 commented on pull request #8929: KAFKA-4996: Fix findbugs multithreaded correctness warnings for streams

Posted by GitBox <gi...@apache.org>.
lct45 commented on pull request #8929:
URL: https://github.com/apache/kafka/pull/8929#issuecomment-650384491


   Failed connect tests on JDC 14
   org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testReplication
   org.apache.kafka.connect.integration.BlockingConnectorTest.testBlockInConnectorStart
   org.apache.kafka.connect.integration.BlockingConnectorTest.testWorkerRestartWithBlockInConnectorStart
   org.apache.kafka.connect.integration.BlockingConnectorTest.testBlockInConnectorInitialize


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #8929: KAFKA-4996: Fix findbugs multithreaded correctness warnings for streams

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #8929:
URL: https://github.com/apache/kafka/pull/8929#issuecomment-649836754


   Ok to test


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #8929: KAFKA-4996: Fix findbugs multithreaded correctness warnings for streams

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #8929:
URL: https://github.com/apache/kafka/pull/8929#discussion_r445935686



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
##########
@@ -226,7 +227,7 @@ public StreamsMetadata getLocalMetadata() {
             if (thisHost.equals(UNKNOWN_HOST)) {
                 return new KeyQueryMetadata(allMetadata.get(0).hostInfo(), Collections.emptySet(), -1);
             }
-            return new KeyQueryMetadata(localMetadata.hostInfo(), Collections.emptySet(), -1);
+            return new KeyQueryMetadata(localMetadata.get().hostInfo(), Collections.emptySet(), -1);

Review comment:
       Very good catch. Thanks, @abbccdda .




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8929: KAFKA-4996: Fix findbugs multithreaded correctness warnings for streams

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8929:
URL: https://github.com/apache/kafka/pull/8929#discussion_r446408964



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
##########
@@ -226,7 +227,7 @@ public StreamsMetadata getLocalMetadata() {
             if (thisHost.equals(UNKNOWN_HOST)) {
                 return new KeyQueryMetadata(allMetadata.get(0).hostInfo(), Collections.emptySet(), -1);
             }
-            return new KeyQueryMetadata(localMetadata.hostInfo(), Collections.emptySet(), -1);
+            return new KeyQueryMetadata(localMetadata.get().hostInfo(), Collections.emptySet(), -1);

Review comment:
       This gets initialized during the rebalance and IQ isn't available until Streams has reached RUNNING. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #8929: KAFKA-4996: Fix findbugs multithreaded correctness warnings for streams

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #8929:
URL: https://github.com/apache/kafka/pull/8929#discussion_r445868515



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
##########
@@ -226,7 +227,7 @@ public StreamsMetadata getLocalMetadata() {
             if (thisHost.equals(UNKNOWN_HOST)) {
                 return new KeyQueryMetadata(allMetadata.get(0).hostInfo(), Collections.emptySet(), -1);
             }
-            return new KeyQueryMetadata(localMetadata.hostInfo(), Collections.emptySet(), -1);
+            return new KeyQueryMetadata(localMetadata.get().hostInfo(), Collections.emptySet(), -1);

Review comment:
       Will this get us NullPointer? Do we need to get an initial value for localMetadata?

##########
File path: gradle/spotbugs-exclude.xml
##########
@@ -348,14 +348,11 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read
     </Match>
 
     <Match>
-        <!-- TODO: fix this (see KAFKA-4996) -->
-        <Or>
-            <Package name="org.apache.kafka.streams.state.internals"/>
+        <!-- Suppress warning about a value that gets initialized

Review comment:
       What are we suppressing here?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #8929: KAFKA-4996: Fix findbugs multithreaded correctness warnings for streams

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #8929:
URL: https://github.com/apache/kafka/pull/8929#issuecomment-651388235


   Retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8929: KAFKA-4996: Fix findbugs multithreaded correctness warnings for streams

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8929:
URL: https://github.com/apache/kafka/pull/8929#discussion_r445872345



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
##########
@@ -50,7 +51,7 @@
     private final HostInfo thisHost;
     private List<StreamsMetadata> allMetadata = Collections.emptyList();
     private Cluster clusterMetadata;
-    private StreamsMetadata localMetadata;
+    private AtomicReference<StreamsMetadata> localMetadata;

Review comment:
       We need to actually initialize this now, although we can just initialize the value to null. ie
   ```
   private AtomicReference<StreamsMetadata> localMetadata = new AtomicReference<>(null);
   ```

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
##########
@@ -226,7 +227,7 @@ public StreamsMetadata getLocalMetadata() {
             if (thisHost.equals(UNKNOWN_HOST)) {
                 return new KeyQueryMetadata(allMetadata.get(0).hostInfo(), Collections.emptySet(), -1);
             }
-            return new KeyQueryMetadata(localMetadata.hostInfo(), Collections.emptySet(), -1);
+            return new KeyQueryMetadata(localMetadata.get().hostInfo(), Collections.emptySet(), -1);

Review comment:
       Yeah good catch, see above

##########
File path: gradle/spotbugs-exclude.xml
##########
@@ -348,14 +348,11 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read
     </Match>
 
     <Match>
-        <!-- TODO: fix this (see KAFKA-4996) -->
-        <Or>
-            <Package name="org.apache.kafka.streams.state.internals"/>
+        <!-- Suppress warning about a value that gets initialized
+             before any other threads are created. -->
             <Package name="org.apache.kafka.streams.processor.internals"/>
-            <Package name="org.apache.kafka.streams.processor"/>
-            <Package name="org.apache.kafka.streams"/>
-        </Or>
-        <Bug pattern="IS2_INCONSISTENT_SYNC"/>
+            <Source name="InternalTopologyBuilder.java"/>
+            <Bug pattern="IS2_INCONSISTENT_SYNC"/>

Review comment:
       It's the `applicationId` -- I guess we should specify in the comment since both you and Boyang asked




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] lct45 commented on pull request #8929: KAFKA-4996: Fix findbugs multithreaded correctness warnings for streams

Posted by GitBox <gi...@apache.org>.
lct45 commented on pull request #8929:
URL: https://github.com/apache/kafka/pull/8929#issuecomment-651329753


   @vvcephei @abbccdda this should be ready to be tested then merged!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #8929: KAFKA-4996: Fix findbugs multithreaded correctness warnings for streams

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #8929:
URL: https://github.com/apache/kafka/pull/8929#discussion_r445935511



##########
File path: gradle/spotbugs-exclude.xml
##########
@@ -348,14 +348,11 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read
     </Match>
 
     <Match>
-        <!-- TODO: fix this (see KAFKA-4996) -->
-        <Or>
-            <Package name="org.apache.kafka.streams.state.internals"/>
+        <!-- Suppress warning about a value that gets initialized
+             before any other threads are created. -->
             <Package name="org.apache.kafka.streams.processor.internals"/>
-            <Package name="org.apache.kafka.streams.processor"/>
-            <Package name="org.apache.kafka.streams"/>
-        </Or>
-        <Bug pattern="IS2_INCONSISTENT_SYNC"/>
+            <Source name="InternalTopologyBuilder.java"/>
+            <Bug pattern="IS2_INCONSISTENT_SYNC"/>

Review comment:
       Quite a few folks have pushed for this, but I can't help feeling like it's the wrong way to go. It doesn't seem appropriate
   to pass the application configs to the topology builder to begin with.
   
   I'm long overdue in writing down my counterproposal, and I won't derail this PR by doing it here. I just wanted to register the possibility that there's a better way out of this mess than forcing users to provide their runtime configurations when compiling their program.
   
   Leaving that aside, I do think you're right. Findbugs is justified in flagging this because, although what we are doing is actually threadsafe, it's hacky and indistingushable from unsafe code.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #8929: KAFKA-4996: Fix findbugs multithreaded correctness warnings for streams

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #8929:
URL: https://github.com/apache/kafka/pull/8929#discussion_r446404393



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
##########
@@ -50,7 +51,7 @@
     private final HostInfo thisHost;
     private List<StreamsMetadata> allMetadata = Collections.emptyList();
     private Cluster clusterMetadata;
-    private StreamsMetadata localMetadata;
+    private AtomicReference<StreamsMetadata> localMetadata = new AtomicReference<>(null);

Review comment:
       This could be final

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
##########
@@ -226,7 +227,7 @@ public StreamsMetadata getLocalMetadata() {
             if (thisHost.equals(UNKNOWN_HOST)) {
                 return new KeyQueryMetadata(allMetadata.get(0).hostInfo(), Collections.emptySet(), -1);
             }
-            return new KeyQueryMetadata(localMetadata.hostInfo(), Collections.emptySet(), -1);
+            return new KeyQueryMetadata(localMetadata.get().hostInfo(), Collections.emptySet(), -1);

Review comment:
       Hmm, why do we still keep it? Based on the reviews for previous version, I believe that there is some strict ordering for getting `localMetadata` initialized to be non-null on L352 first before hitting this logic, but still a null check sound more resilient to me, unless we want to have a NullPointerException to be thrown explicitly.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #8929: KAFKA-4996: Fix findbugs multithreaded correctness warnings for streams

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #8929:
URL: https://github.com/apache/kafka/pull/8929#discussion_r445864937



##########
File path: gradle/spotbugs-exclude.xml
##########
@@ -348,14 +348,11 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read
     </Match>
 
     <Match>
-        <!-- TODO: fix this (see KAFKA-4996) -->
-        <Or>
-            <Package name="org.apache.kafka.streams.state.internals"/>
+        <!-- Suppress warning about a value that gets initialized
+             before any other threads are created. -->
             <Package name="org.apache.kafka.streams.processor.internals"/>
-            <Package name="org.apache.kafka.streams.processor"/>
-            <Package name="org.apache.kafka.streams"/>
-        </Or>
-        <Bug pattern="IS2_INCONSISTENT_SYNC"/>
+            <Source name="InternalTopologyBuilder.java"/>
+            <Bug pattern="IS2_INCONSISTENT_SYNC"/>

Review comment:
       Interesting... which value is it? Looking at the class, it seems like it's probably the builder methods, which should all be called before passing the pre-built topology to the StreamThreads. I agree this should be safe, but it still might be nice to clean up the class so that this suppression isn't necessary.
   
   Can you expand this XML comment a little to explain what would need to be done to resolve this suppression?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8929: KAFKA-4996: Fix findbugs multithreaded correctness warnings for streams

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8929:
URL: https://github.com/apache/kafka/pull/8929#discussion_r445877301



##########
File path: gradle/spotbugs-exclude.xml
##########
@@ -348,14 +348,11 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read
     </Match>
 
     <Match>
-        <!-- TODO: fix this (see KAFKA-4996) -->
-        <Or>
-            <Package name="org.apache.kafka.streams.state.internals"/>
+        <!-- Suppress warning about a value that gets initialized
+             before any other threads are created. -->
             <Package name="org.apache.kafka.streams.processor.internals"/>
-            <Package name="org.apache.kafka.streams.processor"/>
-            <Package name="org.apache.kafka.streams"/>
-        </Or>
-        <Bug pattern="IS2_INCONSISTENT_SYNC"/>
+            <Source name="InternalTopologyBuilder.java"/>
+            <Bug pattern="IS2_INCONSISTENT_SYNC"/>

Review comment:
       We get this from the configs, which are passed in to `rewriteTopology` when it's called by `KafkaStreams` constructor. Since we don't always pass the configs in to the topology, we can't initialize this when the InternalTopologyBuilder is constructed.
   That said, I'd be in favor of requiring to pass the configs in to the topology _instead_ of passing them in by the `KafkaStreams` constructor. In fact we already force users who want an optimized topology to pass the configs in to the topology AND to the KafkaStreams, which is confusing. If the topology sometimes needs the configs, and we always pass the topology to the KafkaStreams, we should just let KafkaStreams get the configs from the topology.
   
   ie, I'd like to deprecate all `KafkaStreams` constructors with config arguments (which I think is all of them?) and all Topology/StreamsBuilder#build/etc without the config argument. Then we could just initialize the application id in the constructor. Of course, this would require a KIP, and is way out of scope here -- just pointing out why this is so weird




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8929: KAFKA-4996: Fix findbugs multithreaded correctness warnings for streams

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8929:
URL: https://github.com/apache/kafka/pull/8929#discussion_r446308395



##########
File path: gradle/spotbugs-exclude.xml
##########
@@ -348,14 +348,11 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read
     </Match>
 
     <Match>
-        <!-- TODO: fix this (see KAFKA-4996) -->
-        <Or>
-            <Package name="org.apache.kafka.streams.state.internals"/>
+        <!-- Suppress warning about a value that gets initialized
+             before any other threads are created. -->
             <Package name="org.apache.kafka.streams.processor.internals"/>
-            <Package name="org.apache.kafka.streams.processor"/>
-            <Package name="org.apache.kafka.streams"/>
-        </Or>
-        <Bug pattern="IS2_INCONSISTENT_SYNC"/>
+            <Source name="InternalTopologyBuilder.java"/>
+            <Bug pattern="IS2_INCONSISTENT_SYNC"/>

Review comment:
       Fair enough. I eagerly await your alternative proposal :P 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #8929: KAFKA-4996: Fix findbugs multithreaded correctness warnings for streams

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #8929:
URL: https://github.com/apache/kafka/pull/8929#issuecomment-650271007


   Retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #8929: KAFKA-4996: Fix findbugs multithreaded correctness warnings for streams

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #8929:
URL: https://github.com/apache/kafka/pull/8929#issuecomment-650270782






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #8929: KAFKA-4996: Fix findbugs multithreaded correctness warnings for streams

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #8929:
URL: https://github.com/apache/kafka/pull/8929#issuecomment-651387931






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8929: KAFKA-4996: Fix findbugs multithreaded correctness warnings for streams

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8929:
URL: https://github.com/apache/kafka/pull/8929#discussion_r446431689



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
##########
@@ -226,7 +227,7 @@ public StreamsMetadata getLocalMetadata() {
             if (thisHost.equals(UNKNOWN_HOST)) {
                 return new KeyQueryMetadata(allMetadata.get(0).hostInfo(), Collections.emptySet(), -1);
             }
-            return new KeyQueryMetadata(localMetadata.hostInfo(), Collections.emptySet(), -1);
+            return new KeyQueryMetadata(localMetadata.get().hostInfo(), Collections.emptySet(), -1);

Review comment:
       I looked through the StreamsMetadataState and it does seem like it could technically be null if this instance was never assigned any active or standby tasks at all, ever. That really _shouldn't_ happen, but of course it can if you massively over-provisioned your app and we shouldn't throw an NPE over that.
   
   Seems like this is actually an existing bug that we should fix. Then we can improve the initialization check on the side




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] lct45 commented on pull request #8929: KAFKA-4996: Fix findbugs multithreaded correctness warnings for streams

Posted by GitBox <gi...@apache.org>.
lct45 commented on pull request #8929:
URL: https://github.com/apache/kafka/pull/8929#issuecomment-649756013


   @ableegoldman 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org