You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "aromanenko-dev (via GitHub)" <gi...@apache.org> on 2023/04/20 15:21:24 UTC

[GitHub] [beam] aromanenko-dev commented on a diff in pull request #26051: Support Dataflow runner v2 in SparkRecieverIO

aromanenko-dev commented on code in PR #26051:
URL: https://github.com/apache/beam/pull/26051#discussion_r1172742005


##########
sdks/java/io/sparkreceiver/2/src/test/java/org/apache/beam/sdk/io/sparkreceiver/RabbitMqReceiverWithOffset.java:
##########
@@ -101,47 +102,69 @@ private void receive() {
       channel.queueDeclare(
           streamName, true, false, false, Collections.singletonMap("x-queue-type", "stream"));
       channel.basicQos(Math.min(MAX_PREFETCH_COUNT, (int) totalMessagesNumber));
-      testConsumer = new TestConsumer(this, channel, this::store);
+      final TestConsumer testConsumer = new TestConsumer(channel, this::store, isStopped);
 
       channel.basicConsume(
           streamName,
           false,
-          Collections.singletonMap("x-stream-offset", currentOffset),
+          Collections.singletonMap("x-stream-offset", startOffset),
           testConsumer);
     } catch (Exception e) {
       LOG.error("Can not basic consume", e);
       throw new RuntimeException(e);
     }
+  }
 
-    while (!isStopped()) {
-      try {
-        TimeUnit.MILLISECONDS.sleep(READ_TIMEOUT_IN_MS);
-      } catch (InterruptedException e) {
-        LOG.error("Interrupted", e);
+  @Override
+  public void stop(String message) {
+    LOG.info(message);
+    isStopped.set(true);
+    super.stop(message);
+    try {
+      if (recordsProcessed != 0) {
+        LOG.info("Try to multiple ack on {}", recordsProcessed);
+        channel.basicAck(recordsProcessed, true);
       }
+      channel.abort();
+      connection.close();
+      LOG.info("RabbitMQ channel and connection were closed");
+    } catch (Exception e) {
+      LOG.error("Exception during stopping of the RabbitMQ receiver", e);
     }
+  }
 
+  @Override
+  public void stop(String message, Throwable error) {

Review Comment:
   Seems like the implementation is pretty similar to `stop(String message)` method above. Can it be optimised?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org