You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/09/25 02:01:30 UTC

[pulsar] branch master updated: Handle RejectedExecutionException at closing ManagedLedgerClientFactory (#2646)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 371f1db  Handle RejectedExecutionException at closing ManagedLedgerClientFactory (#2646)
371f1db is described below

commit 371f1db8598be6b6ce11dfbadf7525e0e71e802f
Author: Sijie Guo <gu...@gmail.com>
AuthorDate: Mon Sep 24 19:01:26 2018 -0700

    Handle RejectedExecutionException at closing ManagedLedgerClientFactory (#2646)
    
    *Motivation*
    
    Following stack trace is observed in CI.
    
    ```
    Error Message
    java.io.IOException: java.util.concurrent.RejectedExecutionException: Task org.apache.bookkeeper.mledger.util.SafeRun$1@7ec176da rejected from java.util.concurrent.ThreadPoolExecutor@33d2f9d3[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 2377]
    Stacktrace
    org.apache.pulsar.broker.PulsarServerException: java.io.IOException: java.util.concurrent.RejectedExecutionException: Task org.apache.bookkeeper.mledger.util.SafeRun$1@7ec176da rejected from java.util.concurrent.ThreadPoolExecutor@33d2f9d3[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 2377]
    	at org.apache.pulsar.broker.PulsarService.close(PulsarService.java:298)
    	at org.apache.pulsar.functions.worker.PulsarWorkerAssignmentTest.shutdown(PulsarWorkerAssignmentTest.java:147)
    	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    	at java.lang.reflect.Method.invoke(Method.java:498)
    	at org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:124)
    	at org.testng.internal.MethodInvocationHelper.invokeMethodConsideringTimeout(MethodInvocationHelper.java:59)
    	at org.testng.internal.Invoker.invokeConfigurationMethod(Invoker.java:451)
    	at org.testng.internal.Invoker.invokeConfigurations(Invoker.java:222)
    	at org.testng.internal.Invoker.invokeMethod(Invoker.java:634)
    	at org.testng.internal.Invoker.retryFailed(Invoker.java:829)
    	at org.testng.internal.Invoker.invokeTestMethods(Invoker.java:1000)
    	at org.testng.internal.TestMethodWorker.invokeTestMethods(TestMethodWorker.java:125)
    	at org.testng.internal.TestMethodWorker.run(TestMethodWorker.java:109)
    	at org.testng.TestRunner.privateRun(TestRunner.java:648)
    	at org.testng.TestRunner.run(TestRunner.java:505)
    	at org.testng.SuiteRunner.runTest(SuiteRunner.java:455)
    	at org.testng.SuiteRunner.runSequentially(SuiteRunner.java:450)
    	at org.testng.SuiteRunner.privateRun(SuiteRunner.java:415)
    	at org.testng.SuiteRunner.run(SuiteRunner.java:364)
    	at org.testng.SuiteRunnerWorker.runSuite(SuiteRunnerWorker.java:52)
    	at org.testng.SuiteRunnerWorker.run(SuiteRunnerWorker.java:84)
    	at org.testng.TestNG.runSuitesSequentially(TestNG.java:1187)
    	at org.testng.TestNG.runSuitesLocally(TestNG.java:1116)
    	at org.testng.TestNG.runSuites(TestNG.java:1028)
    	at org.testng.TestNG.run(TestNG.java:996)
    	at org.apache.maven.surefire.testng.TestNGExecutor.run(TestNGExecutor.java:135)
    	at org.apache.maven.surefire.testng.TestNGDirectoryTestSuite.executeSingleClass(TestNGDirectoryTestSuite.java:112)
    	at org.apache.maven.surefire.testng.TestNGDirectoryTestSuite.execute(TestNGDirectoryTestSuite.java:99)
    	at org.apache.maven.surefire.testng.TestNGProvider.invoke(TestNGProvider.java:146)
    	at org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:379)
    	at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:340)
    	at org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:125)
    	at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:413)
    Caused by: java.io.IOException: java.util.concurrent.RejectedExecutionException: Task org.apache.bookkeeper.mledger.util.SafeRun$1@7ec176da rejected from java.util.concurrent.ThreadPoolExecutor@33d2f9d3[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 2377]
    	at org.apache.pulsar.broker.ManagedLedgerClientFactory.close(ManagedLedgerClientFactory.java:70)
    	at org.apache.pulsar.broker.PulsarService.close(PulsarService.java:225)
    	... 34 more
    Caused by: java.util.concurrent.RejectedExecutionException: Task org.apache.bookkeeper.mledger.util.SafeRun$1@7ec176da rejected from java.util.concurrent.ThreadPoolExecutor@33d2f9d3[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 2377]
    	at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
    	at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
    	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
    	at com.google.common.util.concurrent.ForwardingExecutorService.execute(ForwardingExecutorService.java:99)
    	at org.apache.bookkeeper.common.util.BoundedExecutorService.execute(BoundedExecutorService.java:89)
    	at org.apache.bookkeeper.common.util.OrderedExecutor.executeOrdered(OrderedExecutor.java:291)
    	at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.lambda$initializeBookKeeper$2(ManagedLedgerImpl.java:376)
    	at org.apache.bookkeeper.client.LedgerCreateOp.createComplete(LedgerCreateOp.java:240)
    	at org.apache.bookkeeper.client.LedgerCreateOp.operationComplete(LedgerCreateOp.java:199)
    	at org.apache.bookkeeper.client.LedgerCreateOp.operationComplete(LedgerCreateOp.java:59)
    	at org.apache.bookkeeper.meta.CleanupLedgerManager.close(CleanupLedgerManager.java:232)
    	at org.apache.bookkeeper.client.BookKeeper.close(BookKeeper.java:1467)
    	at org.apache.pulsar.broker.ManagedLedgerClientFactory.close(ManagedLedgerClientFactory.java:66)
    	... 35 more
    
    ```
    
    The race condition happens when:
    
    when closing bookkeeper client, it will error outs all pending metadata operations.
    those callbacks of those operations will be triggered, and submitted to the scheduler
    in managed ledger factory. but the managed ledger factory has been shutdown before,
    so `RejectedExecutionException` will be thrown there.
    
    we can safely ignore this exception.
    
    *Changes*
    
    Catch RejectedExecutionException and ignore it
---
 .../apache/pulsar/broker/ManagedLedgerClientFactory.java | 14 +++++++++++++-
 .../functions/worker/PulsarWorkerAssignmentTest.java     | 16 ++++++++++------
 2 files changed, 23 insertions(+), 7 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
index 3189df2..2b82204 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker;
 import java.io.Closeable;
 import java.io.IOException;
 
+import java.util.concurrent.RejectedExecutionException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
@@ -63,7 +64,18 @@ public class ManagedLedgerClientFactory implements Closeable {
             managedLedgerFactory.shutdown();
             log.info("Closed managed ledger factory");
 
-            bkClient.close();
+            try {
+                bkClient.close();
+            } catch (RejectedExecutionException ree) {
+                // when closing bookkeeper client, it will error outs all pending metadata operations.
+                // those callbacks of those operations will be triggered, and submitted to the scheduler
+                // in managed ledger factory. but the managed ledger factory has been shutdown before,
+                // so `RejectedExecutionException` will be thrown there. we can safely ignore this exception.
+                //
+                // an alternative solution is to close bookkeeper client before shutting down managed ledger
+                // factory, however that might be introducing more unknowns.
+                log.warn("Encountered exceptions on closing bookkeeper client", ree);
+            }
             log.info("Closed BookKeeper client");
         } catch (Exception e) {
             log.warn(e.getMessage(), e);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
index fe28a51..8f18259 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
@@ -139,13 +139,17 @@ public class PulsarWorkerAssignmentTest {
     }
 
     @AfterMethod
-    void shutdown() throws Exception {
+    void shutdown() {
         log.info("--- Shutting down ---");
-        pulsarClient.close();
-        admin.close();
-        functionsWorkerService.stop();
-        pulsar.close();
-        bkEnsemble.stop();
+        try {
+            pulsarClient.close();
+            admin.close();
+            functionsWorkerService.stop();
+            pulsar.close();
+            bkEnsemble.stop();
+        } catch (Exception e) {
+            log.warn("Encountered errors at shutting down PulsarWorkerAssignmentTest", e);
+        }
     }
 
     private WorkerService createPulsarFunctionWorker(ServiceConfiguration config) {