You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by "eolivelli (via GitHub)" <gi...@apache.org> on 2023/02/21 07:45:13 UTC

[GitHub] [pulsar] eolivelli opened a new issue, #19579: Protocol Handlers: close() should be an async operation

eolivelli opened a new issue, #19579:
URL: https://github.com/apache/pulsar/issues/19579

   ### Search before asking
   
   - [X] I searched in the [issues](https://github.com/apache/pulsar/issues) and found nothing similar.
   
   
   ### Motivation
   
   When the broker is shutting down there are many subcomponents to shutdown, in particular the Protocol Handlers.
   Most of the shutdown (close) procedure of the Broker is async but we still have a blocking operation when closing the Protocol Handlers.
   Protocol Handlers, like KOP/Starlight for Kafka, often start threadpools and Pulsar Clients, and it may happen that in order to try a graceful shutdown the PH waits for some resources to be disposed, but such disposal may be deferred for long time (because the broker is also shuttting down and some resources are no more available, leading to errors and backoffs).
   
   The shutdown procedure of the Broker should be as quick as possible in order to prevent latency spikes and other unwanted consequences due to having a partially working broker.
   
   [This](https://github.com/datastax/starlight-for-kafka/actions/runs/4224366992/jobs/7335231134) is an example of a test failed due to time out on Starlight for Kafka.
   
   ```
   
   Error:  testConnectListenerNotExist(io.streamnative.pulsar.handlers.kop.KafkaListenerNameTest)  Time elapsed: 20.037 s  <<< FAILURE!
   org.testng.internal.thread.ThreadTimeoutException: Method io.streamnative.pulsar.handlers.kop.KafkaListenerNameTest.testConnectListenerNotExist() didn't finish within the time-out 20000
   	at java.base@11.0.18/java.lang.StackStreamFactory$AbstractStackWalker.callStackWalk(Native Method)
   	at java.base@11.0.18/java.lang.StackStreamFactory$AbstractStackWalker.beginStackWalk(StackStreamFactory.java:370)
   	at java.base@11.0.18/java.lang.StackStreamFactory$AbstractStackWalker.walk(StackStreamFactory.java:243)
   	at java.base@11.0.18/java.lang.StackWalker.walk(StackWalker.java:498)
   	at app//org.apache.logging.log4j.util.StackLocator.calcLocation(StackLocator.java:96)
   	at app//org.apache.logging.log4j.util.StackLocatorUtil.calcLocation(StackLocatorUtil.java:99)
   	at app//org.apache.logging.log4j.spi.AbstractLogger.getLocation(AbstractLogger.java:2216)
   	at app//org.apache.logging.log4j.spi.AbstractLogger.logMessageTrackRecursion(AbstractLogger.java:2159)
   	at app//org.apache.logging.log4j.spi.AbstractLogger.logMessageSafely(AbstractLogger.java:2142)
   	at app//org.apache.logging.log4j.spi.AbstractLogger.logMessage(AbstractLogger.java:2040)
   	at app//org.apache.logging.log4j.spi.AbstractLogger.logIfEnabled(AbstractLogger.java:1907)
   	at app//org.apache.logging.slf4j.Log4jLogger.warn(Log4jLogger.java:249)
   	at app//io.streamnative.pulsar.handlers.kop.AbstractPulsarClient.close(AbstractPulsarClient.java:49)
   	at app//io.streamnative.pulsar.handlers.kop.KafkaProtocolHandler.close(KafkaProtocolHandler.java:578)
   	at app//org.apache.pulsar.broker.protocol.ProtocolHandlerWithClassLoader.close(ProtocolHandlerWithClassLoader.java:90)
   	at app//org.apache.pulsar.broker.protocol.ProtocolHandlers$$Lambda$1438/0x0000000100b38040.accept(Unknown Source)
   	at java.base@11.0.18/java.lang.Iterable.forEach(Iterable.java:75)
   	at app//org.apache.pulsar.broker.protocol.ProtocolHandlers.close(ProtocolHandlers.java:154)
   	at app//org.apache.pulsar.broker.PulsarService.closeAsync(PulsarService.java:458)
   	at app//org.apache.pulsar.broker.PulsarService$MockitoMock$252149588.closeAsync$accessor$pCM5WTps(Unknown Source)
   	at app//org.apache.pulsar.broker.PulsarService$MockitoMock$252149588$auxiliary$4dJQHnOb.call(Unknown Source)
   	at app//org.mockito.internal.invocation.RealMethod$FromCallable$1.call(RealMethod.java:40)
   	at app//org.mockito.internal.invocation.RealMethod$FromBehavior.invoke(RealMethod.java:62)
   	at app//org.mockito.internal.invocation.InterceptedInvocation.callRealMethod(InterceptedInvocation.java:127)
   	at app//org.mockito.internal.stubbing.answers.CallsRealMethods.answer(CallsRealMethods.java:43)
   	at app//org.mockito.Answers.answer(Answers.java:100)
   	at app//org.mockito.internal.handler.MockHandlerImpl.handle(MockHandlerImpl.java:103)
   	at app//org.mockito.internal.handler.NullResultGuardian.handle(NullResultGuardian.java:29)
   	at app//org.mockito.internal.handler.InvocationNotifierHandler.handle(InvocationNotifierHandler.java:35)
   	at app//org.mockito.internal.creation.bytebuddy.MockMethodInterceptor.doIntercept(MockMethodInterceptor.java:63)
   	at app//org.mockito.internal.creation.bytebuddy.MockMethodInterceptor.doIntercept(MockMethodInterceptor.java:49)
   	at app//org.mockito.internal.creation.bytebuddy.MockMethodInterceptor$DispatcherDefaultingToRealMethod.interceptSuperCallable(MockMethodInterceptor.java:110)
   	at app//org.apache.pulsar.broker.PulsarService$MockitoMock$252149588.closeAsync(Unknown Source)
   	at app//org.apache.pulsar.broker.PulsarService.close(PulsarService.java:380)
   	at app//org.apache.pulsar.broker.PulsarService$MockitoMock$252149588.close$accessor$pCM5WTps(Unknown Source)
   	at app//org.apache.pulsar.broker.PulsarService$MockitoMock$252149588$auxiliary$Lvw9o3WA.call(Unknown Source)
   	at app//org.mockito.internal.invocation.RealMethod$FromCallable$1.call(RealMethod.java:40)
   	at app//org.mockito.internal.invocation.RealMethod$FromBehavior.invoke(RealMethod.java:62)
   	at app//org.mockito.internal.invocation.InterceptedInvocation.callRealMethod(InterceptedInvocation.java:127)
   	at app//org.mockito.internal.stubbing.answers.CallsRealMethods.answer(CallsRealMethods.java:43)
   	at app//org.mockito.Answers.answer(Answers.java:100)
   	at app//org.mockito.internal.handler.MockHandlerImpl.handle(MockHandlerImpl.java:103)
   	at app//org.mockito.internal.handler.NullResultGuardian.handle(NullResultGuardian.java:29)
   	at app//org.mockito.internal.handler.InvocationNotifierHandler.handle(InvocationNotifierHandler.java:35)
   	at app//org.mockito.internal.creation.bytebuddy.MockMethodInterceptor.doIntercept(MockMethodInterceptor.java:63)
   	at app//org.mockito.internal.creation.bytebuddy.MockMethodInterceptor.doIntercept(MockMethodInterceptor.java:49)
   	at app//org.mockito.internal.creation.bytebuddy.MockMethodInterceptor$DispatcherDefaultingToRealMethod.interceptSuperCallable(MockMethodInterceptor.java:110)
   	at app//org.apache.pulsar.broker.PulsarService$MockitoMock$252149588.close(Unknown Source)
   	at app//io.streamnative.pulsar.handlers.kop.KopProtocolHandlerTestBase.stopBroker(KopProtocolHandlerTestBase.java:411)
   	at app//io.streamnative.pulsar.handlers.kop.KopProtocolHandlerTestBase.stopBroker(KopProtocolHandlerTestBase.java:415)
   	at app//io.streamnative.pulsar.handlers.kop.KopProtocolHandlerTestBase.internalCleanup(KopProtocolHandlerTestBase.java:[379](https://github.com/datastax/starlight-for-kafka/actions/runs/4225957299/jobs/7338864110#step:7:380))
   	at app//io.streamnative.pulsar.handlers.kop.KafkaListenerNameTest.testConnectListenerNotExist(KafkaListenerNameTest.java:211)
   	at java.base@11.0.18/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
   	at java.base@11.0.18/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
   	at java.base@11.0.18/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   	at java.base@11.0.18/java.lang.reflect.Method.invoke(Method.java:566)
   	at app//org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:124)
   	at app//org.testng.internal.InvokeMethodRunnable.runOne(InvokeMethodRunnable.java:54)
   	at app//org.testng.internal.InvokeMethodRunnable.run(InvokeMethodRunnable.java:44)
   	at java.base@11.0.18/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
   	at java.base@11.0.18/java.util.concurrent.FutureTask.run(FutureTask.java:264)
   	at java.base@11.0.18/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
   	at java.base@11.0.18/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
   	at java.base@11.0.18/java.lang.Thread.run(Thread.java:829)
   ```
   
   
   ### Solution
   
   The ProtocolHandler API should provide a closeAsync() method instead of a blocking close().
   
   We could make the API backward compatible by leveraging Java default methods:
   
   
   ```
   public default CompletableFuture<?> closeAsync() {
            CompletableFuture<?> result = new CompletableFuture<>();
            try {
               this.close();
               result.complete(null);
            } catch (Throwable t) {
                 // TODO handle InterruptedException here
                 result.completeExceptionally(t);
            }
    }
   ```
   
   
   
   ### Alternatives
   
   Remove the close() API at all and break compatibility.
   
   Rejected because there are already a few ProtocolHandlers and we would make harm to the community by breaking the compatibility.
   
   ### Anything else?
   
   We should port this little API change to stable branches, especially 2.10.x that is the latest version that support JDK8.
   
   This change is needed in order to enhance the shutdown procedure, that could lead to huge latency spikes.
   
   ### Are you willing to submit a PR?
   
   - [ ] I'm willing to submit a PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] MMirelli commented on issue #19579: Protocol Handlers: close() should be an async operation

Posted by "MMirelli (via GitHub)" <gi...@apache.org>.
MMirelli commented on issue #19579:
URL: https://github.com/apache/pulsar/issues/19579#issuecomment-1440693925

   I will pick this one.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] github-actions[bot] commented on issue #19579: Protocol Handlers: close() should be an async operation

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on issue #19579:
URL: https://github.com/apache/pulsar/issues/19579#issuecomment-1491180246

   The issue had no activity for 30 days, mark with Stale label.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org