You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/07/27 05:42:50 UTC

[GitHub] [kafka] guozhangwang opened a new pull request #9083: KAFKA-9450: Follow-up; Forbid process after closed [WIP]

guozhangwang opened a new pull request #9083:
URL: https://github.com/apache/kafka/pull/9083


   Should be reviewed after https://github.com/apache/kafka/pull/8964 is merged, in which we first commit (flush) then suspend.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #9083: KAFKA-9450: Follow-up; Forbid process after closed

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9083:
URL: https://github.com/apache/kafka/pull/9083#discussion_r492941448



##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
##########
@@ -1566,6 +1566,7 @@ public void shouldCheckpointForSuspendedTask() {
         EasyMock.verify(stateManager);
     }
 
+

Review comment:
       nit: not necessary

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
##########
@@ -217,12 +217,12 @@ public StateStore getStateStore(final String name) {
                     forward((ProcessorNode<K, V, ?, ?>) child, key, value);
                 }
             } else {
-                final ProcessorNode<K, V, ?, ?> child = currentNode().getChild(sendTo);

Review comment:
       +1

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformValues.java
##########
@@ -44,19 +45,18 @@ public KStreamFlatTransformValues(final ValueTransformerWithKeySupplier<KIn, VIn
         return valueTransformerSupplier.stores();
     }
 
-    public static class KStreamFlatTransformValuesProcessor<KIn, VIn, VOut> implements Processor<KIn, VIn> {
+    public static class KStreamFlatTransformValuesProcessor<KIn, VIn, VOut> extends AbstractProcessor<KIn, VIn> {

Review comment:
       Could you elaborate why this is better than Processor?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9083: KAFKA-9450: Follow-up; Forbid process after closed

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9083:
URL: https://github.com/apache/kafka/pull/9083#discussion_r492282511



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
##########
@@ -217,12 +217,12 @@ public StateStore getStateStore(final String name) {
                     forward((ProcessorNode<K, V, ?, ?>) child, key, value);
                 }
             } else {
-                final ProcessorNode<K, V, ?, ?> child = currentNode().getChild(sendTo);

Review comment:
       Why change the `K, V` to `?, ?` -- and if we don't do that, do we still need the cast in the `forward` call down below?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #9083: KAFKA-9450: Follow-up; Forbid process after closed

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #9083:
URL: https://github.com/apache/kafka/pull/9083#issuecomment-695635868


   cc @abbccdda for a final review.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #9083: KAFKA-9450: Follow-up; Forbid process after closed

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #9083:
URL: https://github.com/apache/kafka/pull/9083#discussion_r492944504



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
##########
@@ -217,12 +217,12 @@ public StateStore getStateStore(final String name) {
                     forward((ProcessorNode<K, V, ?, ?>) child, key, value);
                 }
             } else {
-                final ProcessorNode<K, V, ?, ?> child = currentNode().getChild(sendTo);

Review comment:
       I made the change in ProcessorNode to add back the template types: https://github.com/apache/kafka/pull/9083/files/82b6f6f5d238401097e0906c8135c5c189524666#diff-705bfd0ed3f214048b76d775708cc7d2L96
   
   But since `currentNode()`'s template is `<?, ?, ?, ?>` its templated `getChild` and that's why I need to weaken it here --- as you can see from the above `if` branch, it now aligns consistently on the typing.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformValues.java
##########
@@ -44,19 +45,18 @@ public KStreamFlatTransformValues(final ValueTransformerWithKeySupplier<KIn, VIn
         return valueTransformerSupplier.stores();
     }
 
-    public static class KStreamFlatTransformValuesProcessor<KIn, VIn, VOut> implements Processor<KIn, VIn> {
+    public static class KStreamFlatTransformValuesProcessor<KIn, VIn, VOut> extends AbstractProcessor<KIn, VIn> {

Review comment:
       As described in at the top, `Let all built-in processors to extend from AbstractProcessor.` The main reason is that AbstractProcessor provides some basic functionalities and hence it's better to let our own impl to base on them.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #9083: KAFKA-9450: Follow-up; Forbid process after closed

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #9083:
URL: https://github.com/apache/kafka/pull/9083#discussion_r492950073



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformValues.java
##########
@@ -44,19 +45,18 @@ public KStreamFlatTransformValues(final ValueTransformerWithKeySupplier<KIn, VIn
         return valueTransformerSupplier.stores();
     }
 
-    public static class KStreamFlatTransformValuesProcessor<KIn, VIn, VOut> implements Processor<KIn, VIn> {
+    public static class KStreamFlatTransformValuesProcessor<KIn, VIn, VOut> extends AbstractProcessor<KIn, VIn> {

Review comment:
       As described in at the top, `Let all built-in processors to extend from AbstractProcessor.` The main reason is that AbstractProcessor provides some basic functionalities and hence it's better to let our own impl to base on them.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9083: KAFKA-9450: Follow-up; Forbid process after closed

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9083:
URL: https://github.com/apache/kafka/pull/9083#discussion_r497709235



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
##########
@@ -217,12 +217,12 @@ public StateStore getStateStore(final String name) {
                     forward((ProcessorNode<K, V, ?, ?>) child, key, value);
                 }
             } else {
-                final ProcessorNode<K, V, ?, ?> child = currentNode().getChild(sendTo);
+                final ProcessorNode<?, ?, ?, ?> child = currentNode().getChild(sendTo);
                 if (child == null) {
                     throw new StreamsException("Unknown downstream node: " + sendTo
                         + " either does not exist or is not connected to this processor.");
                 }
-                forward(child, key, value);
+                forward((ProcessorNode<K, V, ?, ?>) child, key, value);

Review comment:
       @guozhangwang It's weird to remove the types above and add a cast here. Seems like a step backward with regard to type safety. Not sure if @vvcephei's changes would fix it?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #9083: KAFKA-9450: Follow-up; Forbid process after closed

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #9083:
URL: https://github.com/apache/kafka/pull/9083#discussion_r492944504



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
##########
@@ -217,12 +217,12 @@ public StateStore getStateStore(final String name) {
                     forward((ProcessorNode<K, V, ?, ?>) child, key, value);
                 }
             } else {
-                final ProcessorNode<K, V, ?, ?> child = currentNode().getChild(sendTo);

Review comment:
       I made the change in ProcessorNode to add back the template types: https://github.com/apache/kafka/pull/9083/files/82b6f6f5d238401097e0906c8135c5c189524666#diff-705bfd0ed3f214048b76d775708cc7d2L96
   
   But since `currentNode()`'s template is `<?, ?, ?, ?>` its templated `getChild` and that's why I need to weaken it here --- as you can see from the above `if` branch, it now aligns consistently on the typing.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #9083: KAFKA-9450: Follow-up; Forbid process after closed

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9083:
URL: https://github.com/apache/kafka/pull/9083#discussion_r492941448



##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
##########
@@ -1566,6 +1566,7 @@ public void shouldCheckpointForSuspendedTask() {
         EasyMock.verify(stateManager);
     }
 
+

Review comment:
       nit: not necessary

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
##########
@@ -217,12 +217,12 @@ public StateStore getStateStore(final String name) {
                     forward((ProcessorNode<K, V, ?, ?>) child, key, value);
                 }
             } else {
-                final ProcessorNode<K, V, ?, ?> child = currentNode().getChild(sendTo);

Review comment:
       +1

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformValues.java
##########
@@ -44,19 +45,18 @@ public KStreamFlatTransformValues(final ValueTransformerWithKeySupplier<KIn, VIn
         return valueTransformerSupplier.stores();
     }
 
-    public static class KStreamFlatTransformValuesProcessor<KIn, VIn, VOut> implements Processor<KIn, VIn> {
+    public static class KStreamFlatTransformValuesProcessor<KIn, VIn, VOut> extends AbstractProcessor<KIn, VIn> {

Review comment:
       Could you elaborate why this is better than Processor?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #9083: KAFKA-9450: Follow-up; Forbid process after closed

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #9083:
URL: https://github.com/apache/kafka/pull/9083#discussion_r491646523



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
##########
@@ -217,7 +217,7 @@ public StateStore getStateStore(final String name) {
                     forward((ProcessorNode<K, V, ?, ?>) child, key, value);
                 }
             } else {
-                final ProcessorNode<K, V, ?, ?> child = currentNode().getChild(sendTo);
+                final ProcessorNode child = currentNode().getChild(sendTo);

Review comment:
       This is because I added the typing in line 98 of ProcessorNode below, I will revert it back and update the line below in `forward`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #9083: KAFKA-9450: Follow-up; Forbid process after closed [WIP]

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #9083:
URL: https://github.com/apache/kafka/pull/9083#issuecomment-664699699


   cc @ableegoldman @mjsax 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9083: KAFKA-9450: Follow-up; Forbid process after closed

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9083:
URL: https://github.com/apache/kafka/pull/9083#discussion_r489129386



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
##########
@@ -37,6 +37,7 @@
 
     /**
      * If the checkpoint has not been loaded from the file yet (null), then we should not overwrite the checkpoint;
+     * If the checkpoint has been loaded form the file and has never been re-written (empty map), then we should re-write the checkpoint;

Review comment:
       ```suggestion
        * If the checkpoint has been loaded from the file and has never been re-written (empty map), then we should re-write the checkpoint;
   ```

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
##########
@@ -217,7 +217,7 @@ public StateStore getStateStore(final String name) {
                     forward((ProcessorNode<K, V, ?, ?>) child, key, value);
                 }
             } else {
-                final ProcessorNode<K, V, ?, ?> child = currentNode().getChild(sendTo);
+                final ProcessorNode child = currentNode().getChild(sendTo);

Review comment:
       Rebasing error?

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
##########
@@ -1310,9 +1310,11 @@ public void shouldNotCheckpointOffsetsAgainOnCommitIfSnapshotNotChangedMuch() {
         EasyMock.expect(recordCollector.offsets()).andReturn(Collections.singletonMap(changelogPartition, offset)).anyTimes();
         stateManager.checkpoint();
         EasyMock.expectLastCall().once();
+        EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.singleton(changelogPartition));

Review comment:
       Why does this need to change?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
##########
@@ -149,10 +155,20 @@ public void close() {
         } catch (final Exception e) {

Review comment:
       Should we `throwIfClosed()` at the beginning of `close()` too?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
##########
@@ -118,6 +120,10 @@ public void init(final InternalProcessorContext context) {
         } catch (final Exception e) {
             throw new StreamsException(String.format("failed to initialize processor %s", name), e);
         }
+
+        // revived tasks could re-initialize the topology,
+        // in which case we should reset the flag
+        closed = false;

Review comment:
       Should we assert that it's in `closed` when `init` is called?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #9083: KAFKA-9450: Follow-up; Forbid process after closed

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #9083:
URL: https://github.com/apache/kafka/pull/9083#discussion_r491647837



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
##########
@@ -118,6 +120,10 @@ public void init(final InternalProcessorContext context) {
         } catch (final Exception e) {
             throw new StreamsException(String.format("failed to initialize processor %s", name), e);
         }
+
+        // revived tasks could re-initialize the topology,
+        // in which case we should reset the flag
+        closed = false;

Review comment:
       SG.

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
##########
@@ -1310,9 +1310,11 @@ public void shouldNotCheckpointOffsetsAgainOnCommitIfSnapshotNotChangedMuch() {
         EasyMock.expect(recordCollector.offsets()).andReturn(Collections.singletonMap(changelogPartition, offset)).anyTimes();
         stateManager.checkpoint();
         EasyMock.expectLastCall().once();
+        EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.singleton(changelogPartition));

Review comment:
       My bad, will revert.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang merged pull request #9083: KAFKA-9450: Follow-up; Forbid process after closed

Posted by GitBox <gi...@apache.org>.
guozhangwang merged pull request #9083:
URL: https://github.com/apache/kafka/pull/9083


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9083: KAFKA-9450: Follow-up; Forbid process after closed

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9083:
URL: https://github.com/apache/kafka/pull/9083#discussion_r492282511



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
##########
@@ -217,12 +217,12 @@ public StateStore getStateStore(final String name) {
                     forward((ProcessorNode<K, V, ?, ?>) child, key, value);
                 }
             } else {
-                final ProcessorNode<K, V, ?, ?> child = currentNode().getChild(sendTo);

Review comment:
       Why change the `K, V` to `?, ?` -- and if we don't do that, do we still need the cast in the `forward` call down below?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #9083: KAFKA-9450: Follow-up; Forbid process after closed

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #9083:
URL: https://github.com/apache/kafka/pull/9083#issuecomment-693127922


   @ableegoldman @abbccdda This is a minor cleanup PR that I found while working on KAFKA-9450.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org