You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "Abacn (via GitHub)" <gi...@apache.org> on 2023/02/02 20:06:37 UTC

[GitHub] [beam] Abacn commented on a diff in pull request #24973: [#24971] Add a retry policy for JmsIO #24971

Abacn commented on code in PR #24973:
URL: https://github.com/apache/beam/pull/24973#discussion_r1095010257


##########
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java:
##########
@@ -897,71 +944,154 @@ private boolean isExclusiveTopicQueue() {
               == 1;
       return exclusiveTopicQueue;
     }
+  }
 
-    private static class WriterFn<EventT> extends DoFn<EventT, EventT> {
+  public static class JmsIOProducer<T> extends PTransform<PCollection<T>, WriteJmsResult<T>> {
 
-      private Write<EventT> spec;
+    public static final String CONNECTION_ERRORS_METRIC_NAME = "connectionErrors";
+    public static final String PUBLICATION_RETRIES_METRIC_NAME = "publicationRetries";
+    public static final String JMS_IO_PRODUCER_METRIC_NAME = JmsIOProducer.class.getCanonicalName();
 
-      private Connection connection;
-      private Session session;
-      private MessageProducer producer;
-      private Destination destination;
-      private final TupleTag<EventT> failedMessageTag;
+    private static final Logger LOG = LoggerFactory.getLogger(JmsIOProducer.class);
+    private static final String PUBLISH_TO_JMS_STEP_NAME = "Publish to JMS";
 
-      public WriterFn(Write<EventT> spec, TupleTag<EventT> failedMessageTag) {
-        this.spec = spec;
-        this.failedMessageTag = failedMessageTag;
-      }
+    private final JmsIO.Write<T> spec;
+    private final TupleTag<T> messagesTag;
+    private final TupleTag<T> failedMessagesTag;
+
+    JmsIOProducer(JmsIO.Write<T> spec) {
+      this.spec = spec;
+      this.messagesTag = new TupleTag<>();
+      this.failedMessagesTag = new TupleTag<>();
+    }
+
+    @Override
+    public WriteJmsResult<T> expand(PCollection<T> input) {
+      PCollectionTuple failedPublishedMessages =
+          input.apply(
+              PUBLISH_TO_JMS_STEP_NAME,
+              ParDo.of(new JmsIOProducerFn())
+                  .withOutputTags(messagesTag, TupleTagList.of(failedMessagesTag)));
+      PCollection<T> failedMessages =
+          failedPublishedMessages.get(failedMessagesTag).setCoder(input.getCoder());
+      failedPublishedMessages.get(messagesTag).setCoder(input.getCoder());
+      return WriteJmsResult.in(input.getPipeline(), failedMessagesTag, failedMessages);
+    }
+
+    private class JmsIOProducerFn extends DoFn<T, T> {
+
+      private transient @Initialized FluentBackoff retryBackOff;
+
+      private transient @Initialized Session session;
+      private transient @Initialized Connection connection;
+      private transient @Initialized Destination destination;
+      private transient @Initialized MessageProducer producer;
+
+      private final Counter connectionErrors =
+          Metrics.counter(JMS_IO_PRODUCER_METRIC_NAME, CONNECTION_ERRORS_METRIC_NAME);
+      private final Counter publicationRetries =
+          Metrics.counter(JMS_IO_PRODUCER_METRIC_NAME, PUBLICATION_RETRIES_METRIC_NAME);
 
       @Setup
-      public void setup() throws Exception {
-        if (producer == null) {
-          if (spec.getUsername() != null) {
-            this.connection =
-                spec.getConnectionFactory()
-                    .createConnection(spec.getUsername(), spec.getPassword());
-          } else {
-            this.connection = spec.getConnectionFactory().createConnection();
-          }
-          this.connection.start();
-          // false means we don't use JMS transaction.
-          this.session = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
-          if (spec.getQueue() != null) {
-            this.destination = session.createQueue(spec.getQueue());
-          } else if (spec.getTopic() != null) {
-            this.destination = session.createTopic(spec.getTopic());
-          }
+      public void setup() {
+        RetryConfiguration retryConfiguration = checkStateNotNull(spec.getRetryConfiguration());
+
+        retryBackOff =
+            FluentBackoff.DEFAULT
+                .withInitialBackoff(checkStateNotNull(retryConfiguration.getInitialDuration()))
+                .withMaxCumulativeBackoff(checkStateNotNull(retryConfiguration.getMaxDuration()))
+                .withMaxRetries(retryConfiguration.getMaxAttempts());
+      }
 
-          this.producer = this.session.createProducer(null);
+      @StartBundle
+      public void start() throws JMSException {
+        ConnectionFactory connectionFactory = spec.getConnectionFactory();
+        if (spec.getUsername() != null) {
+          this.connection =
+              connectionFactory.createConnection(spec.getUsername(), spec.getPassword());
+        } else {
+          this.connection = connectionFactory.createConnection();
         }
+        this.connection.setExceptionListener(
+            exception -> {
+              connectionErrors.inc();
+              throw new JmsIOException("An error occurred with JMS connection", exception);
+            });
+        this.connection.start();
+        // false means we don't use JMS transaction.
+        this.session = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        if (spec.getQueue() != null) {
+          this.destination = session.createQueue(spec.getQueue());
+        } else if (spec.getTopic() != null) {
+          this.destination = session.createTopic(spec.getTopic());
+        }
+        this.producer = this.session.createProducer(this.destination);
       }
 
       @ProcessElement
-      public void processElement(ProcessContext ctx) {
-        Destination destinationToSendTo = destination;
+      public void processElement(@Element T input, ProcessContext context) {
         try {
-          Message message = spec.getValueMapper().apply(ctx.element(), session);
-          if (spec.getTopicNameMapper() != null) {
-            destinationToSendTo =
-                session.createTopic(spec.getTopicNameMapper().apply(ctx.element()));
+          publishMessage(input, context);
+        } catch (IOException | InterruptedException exception) {
+          LOG.error("Error while publishing the message", exception);
+          context.output(failedMessagesTag, input);
+          Thread.currentThread().interrupt();
+        }
+      }
+
+      private void publishMessage(T input, ProcessContext context)
+          throws IOException, InterruptedException {
+        Sleeper sleeper = Sleeper.DEFAULT;
+        Destination destinationToSendTo = destination;
+        BackOff backoff = checkStateNotNull(retryBackOff).backoff();
+        int publicationAttempt = 0;
+        while (publicationAttempt >= 0) {
+          publicationAttempt++;
+          try {
+            Message message = spec.getValueMapper().apply(input, session);
+            if (spec.getTopicNameMapper() != null) {
+              destinationToSendTo = session.createTopic(spec.getTopicNameMapper().apply(input));
+            }
+            producer.send(destinationToSendTo, message);
+            publicationAttempt = -1;
+          } catch (Exception exception) {
+            if (!BackOffUtils.next(sleeper, backoff)) {
+              LOG.error("The message wasn't published to topic {}", destinationToSendTo, exception);
+              context.output(failedMessagesTag, input);
+              publicationAttempt = -1;
+            } else {
+              publicationRetries.inc();
+              LOG.warn(
+                  "Error sending message on topic {}, retry attempt {}",
+                  destinationToSendTo,
+                  publicationAttempt,
+                  exception);
+            }
           }
-          producer.send(destinationToSendTo, message);
-        } catch (Exception ex) {
-          LOG.error("Error sending message on topic {}", destinationToSendTo);
-          ctx.output(failedMessageTag, ctx.element());
         }
       }
 
-      @Teardown
-      public void teardown() throws Exception {
-        producer.close();
-        producer = null;
-        session.close();
-        session = null;
-        connection.stop();
-        connection.close();
-        connection = null;
+      @FinishBundle

Review Comment:
   we may still want to have a `@TearDown` method to close the connection to avoid leak connections; and name `@FinishBundle` method as finishBundle to avoid confusion



##########
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java:
##########
@@ -897,71 +944,154 @@ private boolean isExclusiveTopicQueue() {
               == 1;
       return exclusiveTopicQueue;
     }
+  }
 
-    private static class WriterFn<EventT> extends DoFn<EventT, EventT> {
+  public static class JmsIOProducer<T> extends PTransform<PCollection<T>, WriteJmsResult<T>> {

Review Comment:
   this internal class does not need to go public; or it even looks not necessary to split this PTransform out from JmsIO.Write.expand(), is this some leftover from the previously splited JmsIOProducer.java?



##########
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java:
##########
@@ -897,71 +944,154 @@ private boolean isExclusiveTopicQueue() {
               == 1;
       return exclusiveTopicQueue;
     }
+  }
 
-    private static class WriterFn<EventT> extends DoFn<EventT, EventT> {
+  public static class JmsIOProducer<T> extends PTransform<PCollection<T>, WriteJmsResult<T>> {
 
-      private Write<EventT> spec;
+    public static final String CONNECTION_ERRORS_METRIC_NAME = "connectionErrors";
+    public static final String PUBLICATION_RETRIES_METRIC_NAME = "publicationRetries";
+    public static final String JMS_IO_PRODUCER_METRIC_NAME = JmsIOProducer.class.getCanonicalName();
 
-      private Connection connection;
-      private Session session;
-      private MessageProducer producer;
-      private Destination destination;
-      private final TupleTag<EventT> failedMessageTag;
+    private static final Logger LOG = LoggerFactory.getLogger(JmsIOProducer.class);
+    private static final String PUBLISH_TO_JMS_STEP_NAME = "Publish to JMS";
 
-      public WriterFn(Write<EventT> spec, TupleTag<EventT> failedMessageTag) {
-        this.spec = spec;
-        this.failedMessageTag = failedMessageTag;
-      }
+    private final JmsIO.Write<T> spec;
+    private final TupleTag<T> messagesTag;
+    private final TupleTag<T> failedMessagesTag;
+
+    JmsIOProducer(JmsIO.Write<T> spec) {
+      this.spec = spec;
+      this.messagesTag = new TupleTag<>();
+      this.failedMessagesTag = new TupleTag<>();
+    }
+
+    @Override
+    public WriteJmsResult<T> expand(PCollection<T> input) {
+      PCollectionTuple failedPublishedMessages =
+          input.apply(
+              PUBLISH_TO_JMS_STEP_NAME,
+              ParDo.of(new JmsIOProducerFn())
+                  .withOutputTags(messagesTag, TupleTagList.of(failedMessagesTag)));
+      PCollection<T> failedMessages =
+          failedPublishedMessages.get(failedMessagesTag).setCoder(input.getCoder());
+      failedPublishedMessages.get(messagesTag).setCoder(input.getCoder());
+      return WriteJmsResult.in(input.getPipeline(), failedMessagesTag, failedMessages);
+    }
+
+    private class JmsIOProducerFn extends DoFn<T, T> {
+
+      private transient @Initialized FluentBackoff retryBackOff;
+
+      private transient @Initialized Session session;
+      private transient @Initialized Connection connection;
+      private transient @Initialized Destination destination;
+      private transient @Initialized MessageProducer producer;
+
+      private final Counter connectionErrors =
+          Metrics.counter(JMS_IO_PRODUCER_METRIC_NAME, CONNECTION_ERRORS_METRIC_NAME);
+      private final Counter publicationRetries =
+          Metrics.counter(JMS_IO_PRODUCER_METRIC_NAME, PUBLICATION_RETRIES_METRIC_NAME);
 
       @Setup
-      public void setup() throws Exception {
-        if (producer == null) {
-          if (spec.getUsername() != null) {
-            this.connection =
-                spec.getConnectionFactory()
-                    .createConnection(spec.getUsername(), spec.getPassword());
-          } else {
-            this.connection = spec.getConnectionFactory().createConnection();
-          }
-          this.connection.start();
-          // false means we don't use JMS transaction.
-          this.session = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
-          if (spec.getQueue() != null) {
-            this.destination = session.createQueue(spec.getQueue());
-          } else if (spec.getTopic() != null) {
-            this.destination = session.createTopic(spec.getTopic());
-          }
+      public void setup() {
+        RetryConfiguration retryConfiguration = checkStateNotNull(spec.getRetryConfiguration());
+
+        retryBackOff =
+            FluentBackoff.DEFAULT
+                .withInitialBackoff(checkStateNotNull(retryConfiguration.getInitialDuration()))
+                .withMaxCumulativeBackoff(checkStateNotNull(retryConfiguration.getMaxDuration()))
+                .withMaxRetries(retryConfiguration.getMaxAttempts());
+      }
 
-          this.producer = this.session.createProducer(null);
+      @StartBundle
+      public void start() throws JMSException {
+        ConnectionFactory connectionFactory = spec.getConnectionFactory();
+        if (spec.getUsername() != null) {
+          this.connection =
+              connectionFactory.createConnection(spec.getUsername(), spec.getPassword());
+        } else {
+          this.connection = connectionFactory.createConnection();
         }
+        this.connection.setExceptionListener(
+            exception -> {
+              connectionErrors.inc();
+              throw new JmsIOException("An error occurred with JMS connection", exception);
+            });
+        this.connection.start();
+        // false means we don't use JMS transaction.
+        this.session = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        if (spec.getQueue() != null) {
+          this.destination = session.createQueue(spec.getQueue());
+        } else if (spec.getTopic() != null) {
+          this.destination = session.createTopic(spec.getTopic());
+        }
+        this.producer = this.session.createProducer(this.destination);
       }
 
       @ProcessElement
-      public void processElement(ProcessContext ctx) {
-        Destination destinationToSendTo = destination;
+      public void processElement(@Element T input, ProcessContext context) {
         try {
-          Message message = spec.getValueMapper().apply(ctx.element(), session);
-          if (spec.getTopicNameMapper() != null) {
-            destinationToSendTo =
-                session.createTopic(spec.getTopicNameMapper().apply(ctx.element()));
+          publishMessage(input, context);
+        } catch (IOException | InterruptedException exception) {
+          LOG.error("Error while publishing the message", exception);
+          context.output(failedMessagesTag, input);
+          Thread.currentThread().interrupt();

Review Comment:
   We can just bubble up the exception instead of generating an InterruptedException here.



##########
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java:
##########
@@ -897,71 +944,154 @@ private boolean isExclusiveTopicQueue() {
               == 1;
       return exclusiveTopicQueue;
     }
+  }
 
-    private static class WriterFn<EventT> extends DoFn<EventT, EventT> {
+  public static class JmsIOProducer<T> extends PTransform<PCollection<T>, WriteJmsResult<T>> {
 
-      private Write<EventT> spec;
+    public static final String CONNECTION_ERRORS_METRIC_NAME = "connectionErrors";
+    public static final String PUBLICATION_RETRIES_METRIC_NAME = "publicationRetries";
+    public static final String JMS_IO_PRODUCER_METRIC_NAME = JmsIOProducer.class.getCanonicalName();
 
-      private Connection connection;
-      private Session session;
-      private MessageProducer producer;
-      private Destination destination;
-      private final TupleTag<EventT> failedMessageTag;
+    private static final Logger LOG = LoggerFactory.getLogger(JmsIOProducer.class);
+    private static final String PUBLISH_TO_JMS_STEP_NAME = "Publish to JMS";
 
-      public WriterFn(Write<EventT> spec, TupleTag<EventT> failedMessageTag) {
-        this.spec = spec;
-        this.failedMessageTag = failedMessageTag;
-      }
+    private final JmsIO.Write<T> spec;
+    private final TupleTag<T> messagesTag;
+    private final TupleTag<T> failedMessagesTag;
+
+    JmsIOProducer(JmsIO.Write<T> spec) {
+      this.spec = spec;
+      this.messagesTag = new TupleTag<>();
+      this.failedMessagesTag = new TupleTag<>();
+    }
+
+    @Override
+    public WriteJmsResult<T> expand(PCollection<T> input) {
+      PCollectionTuple failedPublishedMessages =
+          input.apply(
+              PUBLISH_TO_JMS_STEP_NAME,
+              ParDo.of(new JmsIOProducerFn())
+                  .withOutputTags(messagesTag, TupleTagList.of(failedMessagesTag)));
+      PCollection<T> failedMessages =
+          failedPublishedMessages.get(failedMessagesTag).setCoder(input.getCoder());
+      failedPublishedMessages.get(messagesTag).setCoder(input.getCoder());
+      return WriteJmsResult.in(input.getPipeline(), failedMessagesTag, failedMessages);
+    }
+
+    private class JmsIOProducerFn extends DoFn<T, T> {
+
+      private transient @Initialized FluentBackoff retryBackOff;
+
+      private transient @Initialized Session session;
+      private transient @Initialized Connection connection;
+      private transient @Initialized Destination destination;
+      private transient @Initialized MessageProducer producer;
+
+      private final Counter connectionErrors =
+          Metrics.counter(JMS_IO_PRODUCER_METRIC_NAME, CONNECTION_ERRORS_METRIC_NAME);
+      private final Counter publicationRetries =
+          Metrics.counter(JMS_IO_PRODUCER_METRIC_NAME, PUBLICATION_RETRIES_METRIC_NAME);
 
       @Setup
-      public void setup() throws Exception {
-        if (producer == null) {
-          if (spec.getUsername() != null) {
-            this.connection =
-                spec.getConnectionFactory()
-                    .createConnection(spec.getUsername(), spec.getPassword());
-          } else {
-            this.connection = spec.getConnectionFactory().createConnection();
-          }
-          this.connection.start();
-          // false means we don't use JMS transaction.
-          this.session = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
-          if (spec.getQueue() != null) {
-            this.destination = session.createQueue(spec.getQueue());
-          } else if (spec.getTopic() != null) {
-            this.destination = session.createTopic(spec.getTopic());
-          }
+      public void setup() {
+        RetryConfiguration retryConfiguration = checkStateNotNull(spec.getRetryConfiguration());
+
+        retryBackOff =
+            FluentBackoff.DEFAULT
+                .withInitialBackoff(checkStateNotNull(retryConfiguration.getInitialDuration()))
+                .withMaxCumulativeBackoff(checkStateNotNull(retryConfiguration.getMaxDuration()))
+                .withMaxRetries(retryConfiguration.getMaxAttempts());
+      }
 
-          this.producer = this.session.createProducer(null);
+      @StartBundle
+      public void start() throws JMSException {
+        ConnectionFactory connectionFactory = spec.getConnectionFactory();
+        if (spec.getUsername() != null) {
+          this.connection =
+              connectionFactory.createConnection(spec.getUsername(), spec.getPassword());
+        } else {
+          this.connection = connectionFactory.createConnection();
         }
+        this.connection.setExceptionListener(
+            exception -> {
+              connectionErrors.inc();
+              throw new JmsIOException("An error occurred with JMS connection", exception);
+            });
+        this.connection.start();
+        // false means we don't use JMS transaction.
+        this.session = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        if (spec.getQueue() != null) {
+          this.destination = session.createQueue(spec.getQueue());
+        } else if (spec.getTopic() != null) {
+          this.destination = session.createTopic(spec.getTopic());
+        }
+        this.producer = this.session.createProducer(this.destination);
       }
 
       @ProcessElement
-      public void processElement(ProcessContext ctx) {
-        Destination destinationToSendTo = destination;
+      public void processElement(@Element T input, ProcessContext context) {
         try {
-          Message message = spec.getValueMapper().apply(ctx.element(), session);
-          if (spec.getTopicNameMapper() != null) {
-            destinationToSendTo =
-                session.createTopic(spec.getTopicNameMapper().apply(ctx.element()));
+          publishMessage(input, context);
+        } catch (IOException | InterruptedException exception) {
+          LOG.error("Error while publishing the message", exception);
+          context.output(failedMessagesTag, input);
+          Thread.currentThread().interrupt();
+        }
+      }
+
+      private void publishMessage(T input, ProcessContext context)
+          throws IOException, InterruptedException {
+        Sleeper sleeper = Sleeper.DEFAULT;
+        Destination destinationToSendTo = destination;
+        BackOff backoff = checkStateNotNull(retryBackOff).backoff();
+        int publicationAttempt = 0;
+        while (publicationAttempt >= 0) {
+          publicationAttempt++;
+          try {
+            Message message = spec.getValueMapper().apply(input, session);
+            if (spec.getTopicNameMapper() != null) {
+              destinationToSendTo = session.createTopic(spec.getTopicNameMapper().apply(input));
+            }
+            producer.send(destinationToSendTo, message);
+            publicationAttempt = -1;
+          } catch (Exception exception) {

Review Comment:
   It catches all Exceptions. We should only catch JMSException here, and ideally, checking if the JMSException is retryable (could be in follow up)



##########
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java:
##########
@@ -897,71 +944,154 @@ private boolean isExclusiveTopicQueue() {
               == 1;
       return exclusiveTopicQueue;
     }
+  }
 
-    private static class WriterFn<EventT> extends DoFn<EventT, EventT> {
+  public static class JmsIOProducer<T> extends PTransform<PCollection<T>, WriteJmsResult<T>> {
 
-      private Write<EventT> spec;
+    public static final String CONNECTION_ERRORS_METRIC_NAME = "connectionErrors";
+    public static final String PUBLICATION_RETRIES_METRIC_NAME = "publicationRetries";
+    public static final String JMS_IO_PRODUCER_METRIC_NAME = JmsIOProducer.class.getCanonicalName();
 
-      private Connection connection;
-      private Session session;
-      private MessageProducer producer;
-      private Destination destination;
-      private final TupleTag<EventT> failedMessageTag;
+    private static final Logger LOG = LoggerFactory.getLogger(JmsIOProducer.class);
+    private static final String PUBLISH_TO_JMS_STEP_NAME = "Publish to JMS";
 
-      public WriterFn(Write<EventT> spec, TupleTag<EventT> failedMessageTag) {
-        this.spec = spec;
-        this.failedMessageTag = failedMessageTag;
-      }
+    private final JmsIO.Write<T> spec;
+    private final TupleTag<T> messagesTag;
+    private final TupleTag<T> failedMessagesTag;
+
+    JmsIOProducer(JmsIO.Write<T> spec) {
+      this.spec = spec;
+      this.messagesTag = new TupleTag<>();
+      this.failedMessagesTag = new TupleTag<>();
+    }
+
+    @Override
+    public WriteJmsResult<T> expand(PCollection<T> input) {
+      PCollectionTuple failedPublishedMessages =
+          input.apply(
+              PUBLISH_TO_JMS_STEP_NAME,
+              ParDo.of(new JmsIOProducerFn())
+                  .withOutputTags(messagesTag, TupleTagList.of(failedMessagesTag)));
+      PCollection<T> failedMessages =
+          failedPublishedMessages.get(failedMessagesTag).setCoder(input.getCoder());
+      failedPublishedMessages.get(messagesTag).setCoder(input.getCoder());
+      return WriteJmsResult.in(input.getPipeline(), failedMessagesTag, failedMessages);
+    }
+
+    private class JmsIOProducerFn extends DoFn<T, T> {
+
+      private transient @Initialized FluentBackoff retryBackOff;
+
+      private transient @Initialized Session session;
+      private transient @Initialized Connection connection;
+      private transient @Initialized Destination destination;
+      private transient @Initialized MessageProducer producer;
+
+      private final Counter connectionErrors =
+          Metrics.counter(JMS_IO_PRODUCER_METRIC_NAME, CONNECTION_ERRORS_METRIC_NAME);
+      private final Counter publicationRetries =
+          Metrics.counter(JMS_IO_PRODUCER_METRIC_NAME, PUBLICATION_RETRIES_METRIC_NAME);
 
       @Setup
-      public void setup() throws Exception {
-        if (producer == null) {
-          if (spec.getUsername() != null) {
-            this.connection =
-                spec.getConnectionFactory()
-                    .createConnection(spec.getUsername(), spec.getPassword());
-          } else {
-            this.connection = spec.getConnectionFactory().createConnection();
-          }
-          this.connection.start();
-          // false means we don't use JMS transaction.
-          this.session = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
-          if (spec.getQueue() != null) {
-            this.destination = session.createQueue(spec.getQueue());
-          } else if (spec.getTopic() != null) {
-            this.destination = session.createTopic(spec.getTopic());
-          }
+      public void setup() {
+        RetryConfiguration retryConfiguration = checkStateNotNull(spec.getRetryConfiguration());
+
+        retryBackOff =
+            FluentBackoff.DEFAULT
+                .withInitialBackoff(checkStateNotNull(retryConfiguration.getInitialDuration()))
+                .withMaxCumulativeBackoff(checkStateNotNull(retryConfiguration.getMaxDuration()))
+                .withMaxRetries(retryConfiguration.getMaxAttempts());
+      }
 
-          this.producer = this.session.createProducer(null);
+      @StartBundle
+      public void start() throws JMSException {
+        ConnectionFactory connectionFactory = spec.getConnectionFactory();
+        if (spec.getUsername() != null) {
+          this.connection =
+              connectionFactory.createConnection(spec.getUsername(), spec.getPassword());
+        } else {
+          this.connection = connectionFactory.createConnection();
         }
+        this.connection.setExceptionListener(
+            exception -> {
+              connectionErrors.inc();
+              throw new JmsIOException("An error occurred with JMS connection", exception);
+            });
+        this.connection.start();
+        // false means we don't use JMS transaction.
+        this.session = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        if (spec.getQueue() != null) {
+          this.destination = session.createQueue(spec.getQueue());
+        } else if (spec.getTopic() != null) {
+          this.destination = session.createTopic(spec.getTopic());
+        }
+        this.producer = this.session.createProducer(this.destination);
       }
 
       @ProcessElement
-      public void processElement(ProcessContext ctx) {
-        Destination destinationToSendTo = destination;
+      public void processElement(@Element T input, ProcessContext context) {
         try {
-          Message message = spec.getValueMapper().apply(ctx.element(), session);
-          if (spec.getTopicNameMapper() != null) {
-            destinationToSendTo =
-                session.createTopic(spec.getTopicNameMapper().apply(ctx.element()));
+          publishMessage(input, context);
+        } catch (IOException | InterruptedException exception) {
+          LOG.error("Error while publishing the message", exception);
+          context.output(failedMessagesTag, input);
+          Thread.currentThread().interrupt();
+        }
+      }
+
+      private void publishMessage(T input, ProcessContext context)
+          throws IOException, InterruptedException {
+        Sleeper sleeper = Sleeper.DEFAULT;
+        Destination destinationToSendTo = destination;
+        BackOff backoff = checkStateNotNull(retryBackOff).backoff();
+        int publicationAttempt = 0;
+        while (publicationAttempt >= 0) {
+          publicationAttempt++;
+          try {
+            Message message = spec.getValueMapper().apply(input, session);
+            if (spec.getTopicNameMapper() != null) {
+              destinationToSendTo = session.createTopic(spec.getTopicNameMapper().apply(input));
+            }
+            producer.send(destinationToSendTo, message);
+            publicationAttempt = -1;
+          } catch (Exception exception) {
+            if (!BackOffUtils.next(sleeper, backoff)) {
+              LOG.error("The message wasn't published to topic {}", destinationToSendTo, exception);
+              context.output(failedMessagesTag, input);
+              publicationAttempt = -1;
+            } else {
+              publicationRetries.inc();
+              LOG.warn(
+                  "Error sending message on topic {}, retry attempt {}",
+                  destinationToSendTo,
+                  publicationAttempt,
+                  exception);
+            }
           }
-          producer.send(destinationToSendTo, message);
-        } catch (Exception ex) {
-          LOG.error("Error sending message on topic {}", destinationToSendTo);
-          ctx.output(failedMessageTag, ctx.element());
         }
       }
 
-      @Teardown
-      public void teardown() throws Exception {
-        producer.close();
-        producer = null;
-        session.close();
-        session = null;
-        connection.stop();
-        connection.close();
-        connection = null;
+      @FinishBundle
+      public void teardown() throws JMSException {
+        if (producer != null) {
+          producer.close();
+          producer = null;
+        }
+        if (session != null) {
+          session.close();
+          session = null;
+        }
+        if (connection != null) {

Review Comment:
   I am thinking about the possible overhead of close and re-establish all of producer, session, and connection in each bundle. Does close the producer alone suffices flushing the message (committing them to server)? If so we can just close the producer, and close session and connections in tearDown.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org