You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2023/01/11 15:34:38 UTC

[GitHub] [kafka] mumrah commented on a diff in pull request #13089: KAFKA-14601: Improve exception handling in KafkaEventQueue

mumrah commented on code in PR #13089:
URL: https://github.com/apache/kafka/pull/13089#discussion_r1067114622


##########
core/src/main/scala/kafka/server/metadata/BrokerMetadataSnapshotter.scala:
##########
@@ -61,7 +61,7 @@ class BrokerMetadataSnapshotter(
   /**
    * The event queue which runs this listener.
    */
-  val eventQueue = new KafkaEventQueue(time, logContext, threadNamePrefix.getOrElse(""))
+  val eventQueue = new KafkaEventQueue(time, logContext, threadNamePrefix.getOrElse(""), () => new ShutdownEvent())

Review Comment:
   Why the supplier here for ShutdownEvent? It looks like KafkaEventQueue just takes in the event object



##########
server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java:
##########
@@ -116,33 +116,33 @@ boolean isSingleton() {
         /**
          * Run the event associated with this EventContext.
          */
-        void run(Logger log) throws InterruptedException {
-            try {
-                event.run();
-            } catch (InterruptedException e) {
-                throw e;
-            } catch (Exception e) {
+        boolean run(Logger log, Throwable exceptionToDeliver) {

Review Comment:
   Can we update the javadoc of `run` to include the new param and the meaning of the return value



##########
server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java:
##########
@@ -116,33 +116,33 @@ boolean isSingleton() {
         /**
          * Run the event associated with this EventContext.
          */
-        void run(Logger log) throws InterruptedException {
-            try {
-                event.run();
-            } catch (InterruptedException e) {
-                throw e;
-            } catch (Exception e) {
+        boolean run(Logger log, Throwable exceptionToDeliver) {
+            if (exceptionToDeliver == null) {
                 try {
-                    event.handleException(e);
-                } catch (Throwable t) {
-                    log.error("Unexpected exception in handleException", t);
+                    event.run();
+                } catch (InterruptedException e) {
+                    log.warn("Interrupted while running event. Shutting down event queue");
+                    return true;
+                } catch (Throwable e) {
+                    exceptionToDeliver = e;
                 }
             }
-        }
-
-        /**
-         * Complete the event associated with this EventContext with a timeout exception.
-         */
-        void completeWithTimeout() {
-            completeWithException(new TimeoutException());
+            if (exceptionToDeliver != null) {
+                completeWithException(log, exceptionToDeliver);
+            }
+            return Thread.currentThread().isInterrupted();
         }
 
         /**
          * Complete the event associated with this EventContext with the specified
          * exception.
          */
-        void completeWithException(Throwable t) {
-            event.handleException(t);
+        void completeWithException(Logger log, Throwable t) {
+            try {
+                event.handleException(t);

Review Comment:
   One thing I've run into before is exceptions in Event#run getting swallowed by the default `handleException` implementation. Is it worth adding logging for any exception thrown in Event#run? 



##########
server-common/src/test/resources/test/log4j.properties:
##########
@@ -0,0 +1,21 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+log4j.rootLogger=DEBUG, stdout

Review Comment:
   Was this file added intentionally? If so, we should probably decrease the level here to WARN to avoid spamming Jenkins with test logging



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org