You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pulsar.apache.org by GitBox <gi...@apache.org> on 2020/11/13 02:26:58 UTC

[GitHub] [pulsar-client-node] coryvirok opened a new issue #133: How to clean up consumers/producers on SIGINT

coryvirok opened a new issue #133:
URL: https://github.com/apache/pulsar-client-node/issues/133


   First off, thanks for Pulsar and all of these client libs! I'm excited to vet Pulsar for my company and see if it solves some of Kafka's shortcomings. 
   
   
   I have a gRPC service that implements a server-streaming RPC. This RPC reads from a Pulsar topic and streams the data from the topic to the gRPC client. 
   
   This all works well until I try and gracefully shut down the gRPC server. I'm finding that the server hangs due to the Pulsar Consumer's synchronous `receive()` method. The only way I can get it to work is to use `consumer.receive(timeout)` and check the exception message. Which seems fairly non-idiomatic for Node.js and makes it difficult to handle complex, nested, shutdown logic in a real application.
   
   Here's a simplified example:
   
   ```js
   import Pulsar from "pulsar-client";
   
   const pulsarClient = new Pulsar.Client({
     serviceUrl: "pulsar://127.0.0.1:6650",
   });
   
   async function main() {
     const consumer = await pulsarClient.subscribe({
       topic: `test`,
       subscription: "default",
     });
   
     while (true) {
       try {
         // Since receive() is blocking, use this version in order to give up control and allow
         // the while loop to check if it should finish.
         await consumer.receive(1000);  
         console.log("received message");
       } catch (e) {
         // We are forced to use timeout exceptions and `e.message` matching to know when we should
         // retry the call to `receive(timeout)`
         if (e.message === "Failed to received message TimeOut") {
           console.log("receive() timed out, retrying");
           continue;
         } else if (e.message === "Failed to received message AlreadyClosed") {
           // We are forced to use another exception and `e.message` matching to know when the 
           // consumer was closed by the SIGINT handler
           console.log("consumer closed, breaking loop");
           break;
         }
         // Some other, legit exception that we should handle somewhere but has nothing to do with
         // control flow
         throw e;
       }
     }
   
     // Do whatever cleanup needs to happen
     console.log("CLEANING UP....");
   
     console.log("returning from main");
   }
   
   // Trap Ctrl-C from the keyboard
   // This could also be `process.on("beforeExit", () => {})` or any other signal handler
   //
   // Try commenting this out and seeing if "CLEANING UP..." is logged, (spoiler: it isn't)
   process.on("SIGINT", async () => {
     console.log("shutting down");
     await pulsarClient.close();
   });
   
   await main();
   ```
   
   I'm not a Node.js expert, but from my experience streams are usually managed via events. This pattern allows the developer to write event-driven code instead of imperative loops like the one above. This is helpful because it doesn't require exception handling and message matching to determine control flow. And it lets the developer call `consumer.close()` without worrying if the `consumer.receive()` sync method is blocking the event loop. 
   
   I understand the desire to use async/await here and I'm a HUGE fan of the pattern. But I think it's important for `consumer.receive()` to be async instead of sync. Otherwise, we're left having to write these loops using the timeout version of `receive(timeout)`.
   
   An example of how I'd expect to be able to use the Pulsar Consumer:
   
   ```js
   const consumer = await pulsarClient.subscribe({
     topic: `test`,
     subscription: "default",
   });
   
   consumer.on('data', (msg) => {
     // handle message from consumer
   });
   
   consumer.on('end', () => {
     // clean up any resources that should be cleaned up after a consumer is closed,
     // for various reasons, e.g. the Pulsar server went down, or the SIGINT handler called
     // consumer.close() or pulsarClient.close()
   });
   
   consumer.on('error', (err) => {
     // Consumer is no longer usable, but it received an error... do cleanup and report
     // the issue
   });
   ```
   
   My question is, is this the right way to use this library? Am I missing something or do we need to use the sync methods `consumer.receive()` and `consumer.receive(timeout)` in order to release control to the main loop in order to do things like shutdown the process?
   
   Thanks in advance!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-node] coryvirok commented on issue #133: How to clean up consumers/producers on SIGINT

Posted by GitBox <gi...@apache.org>.
coryvirok commented on issue #133:
URL: https://github.com/apache/pulsar-client-node/issues/133#issuecomment-733844778


   Correct. Thanks for clarifying the roadmap. I'll give it some thought to see if I can think of any ways to make this more idiomatic without having to change the C++ core. 
   
   My current workaround is to pass in an EventEmitter to the function that executes the `receive(timeout)` loop and to check if it has fired before continuing the loop.
   
   E.g.
   ```js
   async function doWork(shutdownEvent) {
     let isShuttingDown = false;
     const consumer = await pulsarClient.subscribe({
       topic: `test`,
       subscription: "default",
     });
   
     shutdownEvent.once('close', () => {
       isShuttingDown = true;
     });
   
     while (!isShuttingDown) {
       try {
         // Since receive() is blocking, use this version in order to give up control and allow
         // the while loop to check if it should finish.
         await consumer.receive(1000);  
         console.log("received message");
       } catch (e) {
         // ...
       }
     }
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-node] magrain commented on issue #133: How to clean up consumers/producers on SIGINT

Posted by GitBox <gi...@apache.org>.
magrain commented on issue #133:
URL: https://github.com/apache/pulsar-client-node/issues/133#issuecomment-734742498


   Thank you.
   Please let me know if you have any better ideas.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-node] coryvirok closed issue #133: How to clean up consumers/producers on SIGINT

Posted by GitBox <gi...@apache.org>.
coryvirok closed issue #133:
URL: https://github.com/apache/pulsar-client-node/issues/133


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-node] magrain commented on issue #133: How to clean up consumers/producers on SIGINT

Posted by GitBox <gi...@apache.org>.
magrain commented on issue #133:
URL: https://github.com/apache/pulsar-client-node/issues/133#issuecomment-733515219


   Let me confirm the point of issue:
   
   1. You want to avoid using
   * receive with timeout
   * error message based exception handling
   
   2. You hope 
   * "async" receive
   * event-driven codes like consumer.on('data', ...
   
   Is my understanding right?
   
   If so, it looks that your understanding and codes are right.
   Unfortunately, pulsar-client-node doesn't support event-based pattern you mentioned and we don't have any plans for now...
   (If you have any idea, discussion/contribution is welcome)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org