You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/06/17 10:52:13 UTC

[GitHub] [kafka] feyman2016 opened a new pull request #8887: KAFKA-10135: Extract Task#executeAndMaybeSwallow to be a general utility function into TaskManager…

feyman2016 opened a new pull request #8887:
URL: https://github.com/apache/kafka/pull/8887


   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8887: KAFKA-10135: Extract Task#executeAndMaybeSwallow to be a general utility function into TaskManager…

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8887:
URL: https://github.com/apache/kafka/pull/8887#discussion_r442574574



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -1048,4 +1042,28 @@ public String toString(final String indent) {
     Set<TaskId> lockedTaskDirectories() {
         return Collections.unmodifiableSet(lockedTaskDirectories);
     }
+
+    public static void executeAndMaybeSwallow(final boolean clean,
+                                              final Runnable runnable,
+                                              final java.util.function.Consumer<RuntimeException> actionIfClean,
+                                              final java.util.function.Consumer<RuntimeException> actionIfNotClean) {

Review comment:
       I am wondering if adding this method to `TaskManager` is the best choice (cf https://issues.apache.org/jira/browse/KAFKA-10055). Thoughts?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8887: KAFKA-10135: Extract Task#executeAndMaybeSwallow to be a general utility function into TaskManager…

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8887:
URL: https://github.com/apache/kafka/pull/8887#discussion_r442574113



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -741,29 +741,23 @@ void shutdown(final boolean clean) {
 
         for (final Task task : tasks.values()) {
             if (task.isActive()) {
-                try {
-                    activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(task.id());
-                } catch (final RuntimeException e) {
-                    if (clean) {
-                        firstException.compareAndSet(null, e);
-                    } else {
-                        log.warn("Ignoring an exception while closing task " + task.id() + " producer.", e);
-                    }
-                }
+                executeAndMaybeSwallow(
+                    clean,
+                    () -> activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(task.id()),
+                    e -> firstException.compareAndSet(null, e),
+                    e -> log.warn("Ignoring an exception while closing task " + task.id() + " producer.", e)
+                );
             }
         }
 
         tasks.clear();
 
-        try {
-            activeTaskCreator.closeThreadProducerIfNeeded();
-        } catch (final RuntimeException e) {
-            if (clean) {
-                firstException.compareAndSet(null, e);
-            } else {
-                log.warn("Ignoring an exception while closing thread producer.", e);
-            }
-        }
+        executeAndMaybeSwallow(
+            clean,
+            () -> activeTaskCreator.closeThreadProducerIfNeeded(),

Review comment:
       This is a lambda. Do you mean method reference? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] feyman2016 commented on a change in pull request #8887: KAFKA-10135: Extract Task#executeAndMaybeSwallow to be a general utility function into TaskManager…

Posted by GitBox <gi...@apache.org>.
feyman2016 commented on a change in pull request #8887:
URL: https://github.com/apache/kafka/pull/8887#discussion_r443105822



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -741,29 +741,23 @@ void shutdown(final boolean clean) {
 
         for (final Task task : tasks.values()) {
             if (task.isActive()) {
-                try {
-                    activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(task.id());
-                } catch (final RuntimeException e) {
-                    if (clean) {
-                        firstException.compareAndSet(null, e);
-                    } else {
-                        log.warn("Ignoring an exception while closing task " + task.id() + " producer.", e);
-                    }
-                }
+                executeAndMaybeSwallow(
+                    clean,
+                    () -> activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(task.id()),
+                    e -> firstException.compareAndSet(null, e),
+                    e -> log.warn("Ignoring an exception while closing task " + task.id() + " producer.", e)
+                );
             }
         }
 
         tasks.clear();
 
-        try {
-            activeTaskCreator.closeThreadProducerIfNeeded();
-        } catch (final RuntimeException e) {
-            if (clean) {
-                firstException.compareAndSet(null, e);
-            } else {
-                log.warn("Ignoring an exception while closing thread producer.", e);
-            }
-        }
+        executeAndMaybeSwallow(
+            clean,
+            () -> activeTaskCreator.closeThreadProducerIfNeeded(),

Review comment:
       Updated with method reference, thanks!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #8887: KAFKA-10135: Extract Task#executeAndMaybeSwallow to be a general utility function into TaskManager…

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #8887:
URL: https://github.com/apache/kafka/pull/8887#discussion_r443159967



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -741,29 +741,23 @@ void shutdown(final boolean clean) {
 
         for (final Task task : tasks.values()) {
             if (task.isActive()) {
-                try {
-                    activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(task.id());
-                } catch (final RuntimeException e) {
-                    if (clean) {
-                        firstException.compareAndSet(null, e);
-                    } else {
-                        log.warn("Ignoring an exception while closing task " + task.id() + " producer.", e);
-                    }
-                }
+                executeAndMaybeSwallow(
+                    clean,
+                    () -> activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(task.id()),
+                    e -> firstException.compareAndSet(null, e),
+                    e -> log.warn("Ignoring an exception while closing task " + task.id() + " producer.", e)
+                );
             }
         }
 
         tasks.clear();
 
-        try {
-            activeTaskCreator.closeThreadProducerIfNeeded();
-        } catch (final RuntimeException e) {
-            if (clean) {
-                firstException.compareAndSet(null, e);
-            } else {
-                log.warn("Ignoring an exception while closing thread producer.", e);
-            }
-        }
+        executeAndMaybeSwallow(
+            clean,
+            () -> activeTaskCreator.closeThreadProducerIfNeeded(),

Review comment:
       Oh yea! @mjsax 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #8887: KAFKA-10135: Extract Task#executeAndMaybeSwallow to be a general utility function into TaskManager…

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #8887:
URL: https://github.com/apache/kafka/pull/8887#discussion_r443837837



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -1048,4 +1042,28 @@ public String toString(final String indent) {
     Set<TaskId> lockedTaskDirectories() {
         return Collections.unmodifiableSet(lockedTaskDirectories);
     }
+
+    public static void executeAndMaybeSwallow(final boolean clean,
+                                              final Runnable runnable,
+                                              final java.util.function.Consumer<RuntimeException> actionIfClean,
+                                              final java.util.function.Consumer<RuntimeException> actionIfNotClean) {

Review comment:
       I'm fine as well, will make a reference to 10055 of this PR




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda merged pull request #8887: KAFKA-10135: Extract Task#executeAndMaybeSwallow to be a general utility function into TaskManager…

Posted by GitBox <gi...@apache.org>.
abbccdda merged pull request #8887:
URL: https://github.com/apache/kafka/pull/8887


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] feyman2016 commented on pull request #8887: KAFKA-10135: Extract Task#executeAndMaybeSwallow to be a general utility function into TaskManager…

Posted by GitBox <gi...@apache.org>.
feyman2016 commented on pull request #8887:
URL: https://github.com/apache/kafka/pull/8887#issuecomment-647856516


   @abbccdda @mjsax Thanks a lot!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] feyman2016 commented on a change in pull request #8887: KAFKA-10135: Extract Task#executeAndMaybeSwallow to be a general utility function into TaskManager…

Posted by GitBox <gi...@apache.org>.
feyman2016 commented on a change in pull request #8887:
URL: https://github.com/apache/kafka/pull/8887#discussion_r441468868



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -1048,4 +1042,28 @@ public String toString(final String indent) {
     Set<TaskId> lockedTaskDirectories() {
         return Collections.unmodifiableSet(lockedTaskDirectories);
     }
+
+    public static void executeAndMaybeSwallow(final boolean clean,
+                                              final Runnable runnable,
+                                              final java.util.function.Consumer<RuntimeException> actionIfClean,
+                                              final java.util.function.Consumer<RuntimeException> actionIfNotClean) {

Review comment:
       At first thought, the not `clean` case normally just do some logs. with only the log string different,  I would like to still use the `executeAndMaybeSwallow ` in `Task.scala`, but since callers may need different log levels or they may want do more than just a log, so just pass a more generic argument `actionIfNotClean`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on pull request #8887: KAFKA-10135: Extract Task#executeAndMaybeSwallow to be a general utility function into TaskManager…

Posted by GitBox <gi...@apache.org>.
abbccdda commented on pull request #8887:
URL: https://github.com/apache/kafka/pull/8887#issuecomment-647778576


   Only one flaky test failed:
   ```
   org.apache.kafka.connect.integration.BlockingConnectorTest.testBlockInConnectorStop
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] feyman2016 commented on a change in pull request #8887: KAFKA-10135: Extract Task#executeAndMaybeSwallow to be a general utility function into TaskManager…

Posted by GitBox <gi...@apache.org>.
feyman2016 commented on a change in pull request #8887:
URL: https://github.com/apache/kafka/pull/8887#discussion_r443096402



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -1048,4 +1042,28 @@ public String toString(final String indent) {
     Set<TaskId> lockedTaskDirectories() {
         return Collections.unmodifiableSet(lockedTaskDirectories);
     }
+
+    public static void executeAndMaybeSwallow(final boolean clean,
+                                              final Runnable runnable,
+                                              final java.util.function.Consumer<RuntimeException> actionIfClean,
+                                              final java.util.function.Consumer<RuntimeException> actionIfNotClean) {

Review comment:
       @abbccdda :(
   @mjsax 
   I have similar thoughts when doing this, but I don't know enough context to decide whether this method is general enough to be extracted out to a utility class. If there are more similar static methods in other places, then I think we should put it in the utility class.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #8887: KAFKA-10135: Extract Task#executeAndMaybeSwallow to be a general utility function into TaskManager…

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #8887:
URL: https://github.com/apache/kafka/pull/8887#issuecomment-647718486


   Retest this please.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #8887: KAFKA-10135: Extract Task#executeAndMaybeSwallow to be a general utility function into TaskManager…

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #8887:
URL: https://github.com/apache/kafka/pull/8887#discussion_r441954245



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -1048,4 +1042,28 @@ public String toString(final String indent) {
     Set<TaskId> lockedTaskDirectories() {
         return Collections.unmodifiableSet(lockedTaskDirectories);
     }
+
+    public static void executeAndMaybeSwallow(final boolean clean,
+                                              final Runnable runnable,
+                                              final java.util.function.Consumer<RuntimeException> actionIfClean,
+                                              final java.util.function.Consumer<RuntimeException> actionIfNotClean) {

Review comment:
       It seems not possible in Java

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -741,29 +741,23 @@ void shutdown(final boolean clean) {
 
         for (final Task task : tasks.values()) {
             if (task.isActive()) {
-                try {
-                    activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(task.id());
-                } catch (final RuntimeException e) {
-                    if (clean) {
-                        firstException.compareAndSet(null, e);
-                    } else {
-                        log.warn("Ignoring an exception while closing task " + task.id() + " producer.", e);
-                    }
-                }
+                executeAndMaybeSwallow(
+                    clean,
+                    () -> activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(task.id()),
+                    e -> firstException.compareAndSet(null, e),
+                    e -> log.warn("Ignoring an exception while closing task " + task.id() + " producer.", e)
+                );
             }
         }
 
         tasks.clear();
 
-        try {
-            activeTaskCreator.closeThreadProducerIfNeeded();
-        } catch (final RuntimeException e) {
-            if (clean) {
-                firstException.compareAndSet(null, e);
-            } else {
-                log.warn("Ignoring an exception while closing thread producer.", e);
-            }
-        }
+        executeAndMaybeSwallow(
+            clean,
+            () -> activeTaskCreator.closeThreadProducerIfNeeded(),

Review comment:
       could be replaced by lambda.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8887: KAFKA-10135: Extract Task#executeAndMaybeSwallow to be a general utility function into TaskManager…

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8887:
URL: https://github.com/apache/kafka/pull/8887#discussion_r443768571



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -1048,4 +1042,28 @@ public String toString(final String indent) {
     Set<TaskId> lockedTaskDirectories() {
         return Collections.unmodifiableSet(lockedTaskDirectories);
     }
+
+    public static void executeAndMaybeSwallow(final boolean clean,
+                                              final Runnable runnable,
+                                              final java.util.function.Consumer<RuntimeException> actionIfClean,
+                                              final java.util.function.Consumer<RuntimeException> actionIfNotClean) {

Review comment:
       I am also fine to just merge this as-is, and refactor via KAFKA-10055.
   
   \cc @abbccdda WDYT?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] feyman2016 commented on a change in pull request #8887: KAFKA-10135: Extract Task#executeAndMaybeSwallow to be a general utility function into TaskManager…

Posted by GitBox <gi...@apache.org>.
feyman2016 commented on a change in pull request #8887:
URL: https://github.com/apache/kafka/pull/8887#discussion_r441468868



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -1048,4 +1042,28 @@ public String toString(final String indent) {
     Set<TaskId> lockedTaskDirectories() {
         return Collections.unmodifiableSet(lockedTaskDirectories);
     }
+
+    public static void executeAndMaybeSwallow(final boolean clean,
+                                              final Runnable runnable,
+                                              final java.util.function.Consumer<RuntimeException> actionIfClean,
+                                              final java.util.function.Consumer<RuntimeException> actionIfNotClean) {

Review comment:
       At first thought, the not `clean` case normally just do some logs. with only the log string different,  I would like to still use the `executeAndMaybeSwallow ` in `Task.scala`, but since callers may need different log levels or they may want do more than just a log, so just pass a more generic argument `actionIfNotClean`.
   And yeah, `java.util.function.Consumer` looks ugly but I am not aware of some elegant way for import alias...




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org