You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "Amraneze (via GitHub)" <gi...@apache.org> on 2023/02/10 18:28:21 UTC

[GitHub] [beam] Amraneze commented on a diff in pull request #24973: [#24971] Add a retry policy for JmsIO #24971

Amraneze commented on code in PR #24973:
URL: https://github.com/apache/beam/pull/24973#discussion_r1103091345


##########
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java:
##########
@@ -932,36 +1036,158 @@ public void setup() throws Exception {
           } else if (spec.getTopic() != null) {
             this.destination = session.createTopic(spec.getTopic());
           }
-
-          this.producer = this.session.createProducer(null);
+          this.producer = this.session.createProducer(this.destination);
+          this.isProducerNeedsToBeCreated = false;
         }
       }
 
-      @ProcessElement
-      public void processElement(ProcessContext ctx) {
+      public void publishMessage(T input) throws JMSException, JmsIOException {
         Destination destinationToSendTo = destination;
         try {
-          Message message = spec.getValueMapper().apply(ctx.element(), session);
+          Message message = spec.getValueMapper().apply(input, session);
           if (spec.getTopicNameMapper() != null) {
-            destinationToSendTo =
-                session.createTopic(spec.getTopicNameMapper().apply(ctx.element()));
+            destinationToSendTo = session.createTopic(spec.getTopicNameMapper().apply(input));
           }
           producer.send(destinationToSendTo, message);
-        } catch (Exception ex) {
-          LOG.error("Error sending message on topic {}", destinationToSendTo);
-          ctx.output(failedMessageTag, ctx.element());
+        } catch (JMSException | JmsIOException | NullPointerException exception) {
+          // Handle NPE in case of getValueMapper or getTopicNameMapper returns NPE
+          if (exception instanceof NullPointerException) {
+            throw new JmsIOException("An error occurred", exception);
+          }
+          throw exception;
+        }
+      }
+
+      public void close() throws JMSException {
+        isProducerNeedsToBeCreated = true;
+        if (producer != null) {
+          producer.close();
+          producer = null;
+        }
+        if (session != null) {
+          session.close();
+          session = null;
         }
+        if (connection != null) {
+          try {
+            // If the connection failed, stopping the connection will throw a JMSException
+            connection.stop();
+          } catch (JMSException exception) {
+            LOG.warn("The connection couldn't be closed", exception);
+          }
+          connection.close();
+          connection = null;
+        }
+      }
+    }
+
+    private static class JmsIOProducerFn<T> extends DoFn<T, T> {

Review Comment:
   I separated them because I'm not sure if the Backoff will hold the DoFn thread and I don't know if it will do that for the whole bundle (Which means the whole bundle will be in wait until the failed message is published). Just to not have any performances issues doing that. Do you think it would be better to keep in one DoFn ?
   
   >PS: I tried to have a sort of cache of failed messages and flush them in Finishbundle cycle, but when throwing an exception in ProcessElement, the worker will call TearDown without FinishBundle.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org