You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/07/07 20:28:00 UTC

[GitHub] [beam] lukecwik commented on a change in pull request #12184: [BEAM-10392] Make sure that messages that are unroutable are returned to the sender and redelivered.

lukecwik commented on a change in pull request #12184:
URL: https://github.com/apache/beam/pull/12184#discussion_r451123739



##########
File path: sdks/java/io/rabbitmq/src/test/java/org/apache/beam/sdk/io/rabbitmq/RabbitMqIOTest.java
##########
@@ -196,59 +200,74 @@ private void doExchangeTest(ExchangeTestPlan testPlan, boolean simulateIncompati
         exchangeType = "fanout";
       }
     }
+    final String finalExchangeType = exchangeType;
+    final CountDownLatch waitForExchangeToBeDeclared = new CountDownLatch(1);
+    final BlockingQueue<byte[]> recordsToPublish = new LinkedBlockingQueue<>();
+    recordsToPublish.addAll(RabbitMqTestUtils.generateRecords(testPlan.getNumRecordsToPublish()));
+    Thread publisher =
+        new Thread(
+            () -> {
+              Connection connection = null;
+              Channel channel = null;
+              try {
+                ConnectionFactory connectionFactory = new ConnectionFactory();
+                connectionFactory.setAutomaticRecoveryEnabled(false);
+                connectionFactory.setUri(uri);
+                connection = connectionFactory.newConnection();
+                channel = connection.createChannel();
+                channel.exchangeDeclare(exchange, finalExchangeType);
+                // We are relying on the pipeline to declare the queue and messages that are
+                // published without a queue being declared are "unroutable". Since there is a race
+                // between when the pipeline declares and when we can start publishing, we add a
+                // handler to republish messages that are returned to us.
+                channel.addReturnListener(
+                    (replyCode, replyText, exchange1, routingKey, properties, body) -> {
+                      try {
+                        recordsToPublish.put(body);
+                      } catch (Exception e) {
+                        throw new RuntimeException(e);
+                      }
+                    });
+                waitForExchangeToBeDeclared.countDown();
+                while (true) {
+                  byte[] record = recordsToPublish.take();
+                  if (record == terminalRecord) {
+                    return;
+                  }
+                  channel.basicPublish(
+                      exchange,
+                      testPlan.publishRoutingKeyGen().get(),
+                      true, // ensure that messages are returned to sender
+                      testPlan.getPublishProperties(),
+                      record);
+                }
 
-    ConnectionFactory connectionFactory = new ConnectionFactory();
-    connectionFactory.setAutomaticRecoveryEnabled(false);
-    connectionFactory.setUri(uri);
-    Connection connection = null;
-    Channel channel = null;
-
-    try {
-      connection = connectionFactory.newConnection();
-      channel = connection.createChannel();
-      channel.exchangeDeclare(exchange, exchangeType);
-      final Channel finalChannel = channel;
-      Thread publisher =
-          new Thread(
-              () -> {
-                try {
-                  Thread.sleep(5000);
-                } catch (Exception e) {
-                  LOG.error(e.getMessage(), e);
+              } catch (Exception e) {
+                throw new RuntimeException(e);
+              } finally {
+                if (channel != null) {
+                  // channel may have already been closed automatically due to protocol failure
+                  try {
+                    channel.close();
+                  } catch (Exception e) {
+                    /* ignored */
+                  }
                 }
-                for (int i = 0; i < testPlan.getNumRecordsToPublish(); i++) {
+                if (connection != null) {
+                  // connection may have already been closed automatically due to protocol failure
                   try {
-                    finalChannel.basicPublish(
-                        exchange,
-                        testPlan.publishRoutingKeyGen().get(),
-                        testPlan.getPublishProperties(),
-                        RabbitMqTestUtils.generateRecord(i));
+                    connection.close();
                   } catch (Exception e) {
-                    LOG.error(e.getMessage(), e);
+                    /* ignored */
                   }
                 }
-              });
-      publisher.start();
-      p.run();
-      publisher.join();
-    } finally {
-      if (channel != null) {
-        // channel may have already been closed automatically due to protocol failure
-        try {
-          channel.close();
-        } catch (Exception e) {
-          /* ignored */
-        }
-      }
-      if (connection != null) {
-        // connection may have already been closed automatically due to protocol failure
-        try {
-          connection.close();
-        } catch (Exception e) {
-          /* ignored */
-        }
-      }
-    }
+              }
+            });
+    publisher.start();
+    waitForExchangeToBeDeclared.countDown();

Review comment:
       Your right.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org