You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2022/04/25 23:20:31 UTC

[beam] branch master updated: [BEAM-13608] JmsIO dynamic topics feature (#17163)

This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new a936ef88985 [BEAM-13608] JmsIO dynamic topics feature (#17163)
a936ef88985 is described below

commit a936ef889859f0c93682c2480a68487ae53c03fe
Author: rvballada <vi...@renault.com>
AuthorDate: Tue Apr 26 01:20:22 2022 +0200

    [BEAM-13608] JmsIO dynamic topics feature (#17163)
    
    * [BEAM-13608] => First Dynamic Topic management implementation
    
    * [BEAM-13608] => Changes after design document review:
    - Remove writeDynamic method, write is now parameterized,
    - Remove via method replaced by withTopicNameMapper and withValueMapper.
    
    * Add error management in WriteJmsResult
    
    * Write Unit Tests for WriteJmsResult
    
    * Rename SerializableMapper into SerializableMessageMapper
    
    * Update sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
    
    Co-authored-by: Lukasz Cwik <lc...@google.com>
    
    * Update sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
    
    Co-authored-by: Lukasz Cwik <lc...@google.com>
    
    * Update sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
    
    Code review
    
    Co-authored-by: Lukasz Cwik <lc...@google.com>
    
    * Update sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/TextMessageMapper.java
    
    Code review
    
    Co-authored-by: Lukasz Cwik <lc...@google.com>
    
    * Update sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java
    
    Co-authored-by: Lukasz Cwik <lc...@google.com>
    
    * Update sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java
    
    Co-authored-by: Lukasz Cwik <lc...@google.com>
    
    * [BEAM-13608] => Upgrade comments and code reviex
    
    * Update sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java
    
    Co-authored-by: Lukasz Cwik <lc...@google.com>
    
    * Update sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java
    
    Co-authored-by: Lukasz Cwik <lc...@google.com>
    
    * [BEAM-13608] => Upgrade CHANGES.ms
    
    * Update sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
    
    Co-authored-by: Lukasz Cwik <lc...@google.com>
    
    * Update sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
    
    Co-authored-by: Lukasz Cwik <lc...@google.com>
    
    * Update sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
    
    Co-authored-by: Lukasz Cwik <lc...@google.com>
    
    * Update sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
    
    Co-authored-by: Lukasz Cwik <lc...@google.com>
    
    * Update sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
    
    Co-authored-by: Lukasz Cwik <lc...@google.com>
    
    * Update sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
    
    Co-authored-by: Lukasz Cwik <lc...@google.com>
    
    * [BEAM-13608] => Delete SerializableMessageMapper and replace it with a SerializableBiFunction
    
    * [BEAM-13608] => Remove withCoder and use input.getCoder
    
    * [BEAM-13608] => Comment formatting
    
    * [BEAM-13608] => Update comments and documentation
    
    * [BEAM-13608] => Fix format violation
    
    * Apply suggestions from code review
    
    Co-authored-by: Lukasz Cwik <lc...@google.com>
---
 CHANGES.md                                         |  14 +-
 .../java/org/apache/beam/sdk/io/jms/JmsIO.java     | 191 ++++++++++++++++-----
 .../org/apache/beam/sdk/io/jms/JmsIOException.java |  28 +++
 .../apache/beam/sdk/io/jms/TextMessageMapper.java  |  42 +++++
 .../org/apache/beam/sdk/io/jms/WriteJmsResult.java |  71 ++++++++
 .../java/org/apache/beam/sdk/io/jms/JmsIOTest.java | 122 ++++++++++++-
 6 files changed, 425 insertions(+), 43 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index fe4ad04e3c5..9737f8a0b2f 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -49,7 +49,7 @@
 ## Known Issues
 
 * ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
--->
+
 # [2.39.0] - Unreleased
 
 ## Highlights
@@ -59,16 +59,24 @@
 
 ## I/Os
 
-* Support for X source added (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
+* JmsIO gains the ability to map any kind of input to any subclass of `javax.jms.Message` (Java) ([BEAM-16308](https://issues.apache.org/jira/browse/BEAM-16308)).
+* JmsIO introduces the ability to write to dynamic topics (Java) ([BEAM-16308](https://issues.apache.org/jira/browse/BEAM-16308)).
+  * A `topicNameMapper` must be set to extract the topic name from the input value.
+  * A `valueMapper` must be set to convert the input value to JMS message.
 
 ## New Features / Improvements
 
-* X feature added (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
 * 'Manage Clusters' JupyterLab extension added for users to configure usage of Dataproc clusters managed by Interactive Beam (Python) ([BEAM-14130](https://issues.apache.org/jira/browse/BEAM-14130)).
 
 ## Breaking Changes
 
 * Unused functions `ShallowCloneParDoPayload()`, `ShallowCloneSideInput()`, and `ShallowCloneFunctionSpec()` have been removed from the Go SDK's pipelinex package ([BEAM-13739](https://issues.apache.org/jira/browse/BEAM-13739)).
+* JmsIO requires an explicit `valueMapper` to be set ([BEAM-16308](https://issues.apache.org/jira/browse/BEAM-16308)). You can use the `TextMessageMapper` to convert `String` inputs to JMS `TestMessage`s:
+```java
+  JmsIO.<String>write()
+        .withConnectionFactory(jmsConnectionFactory)
+        .withValueMapper(new TextMessageMapper());
+```
 
 ## Deprecations
 
diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
index 9fa4492cf23..d5460b48bbb 100644
--- a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
+++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
@@ -28,6 +28,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
+import java.util.stream.Stream;
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
 import javax.jms.Destination;
@@ -48,13 +49,19 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableBiFunction;
+import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
 import org.checkerframework.checker.nullness.qual.Nullable;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * An unbounded source for JMS destinations (queues or topics).
@@ -107,7 +114,6 @@ import org.joda.time.Instant;
  *   .apply(JmsIO.write()
  *       .withConnectionFactory(myConnectionFactory)
  *       .withQueue("my-queue")
- *
  * }</pre>
  */
 @Experimental(Kind.SOURCE_SINK)
@@ -116,6 +122,8 @@ import org.joda.time.Instant;
 })
 public class JmsIO {
 
+  private static final Logger LOG = LoggerFactory.getLogger(JmsIO.class);
+
   public static Read<JmsRecord> read() {
     return new AutoValue_JmsIO_Read.Builder<JmsRecord>()
         .setMaxNumRecords(Long.MAX_VALUE)
@@ -157,8 +165,8 @@ public class JmsIO {
     return new AutoValue_JmsIO_Read.Builder<T>().setMaxNumRecords(Long.MAX_VALUE).build();
   }
 
-  public static Write write() {
-    return new AutoValue_JmsIO_Write.Builder().build();
+  public static <EventT> Write<EventT> write() {
+    return new AutoValue_JmsIO_Write.Builder<EventT>().build();
   }
 
   /**
@@ -604,7 +612,8 @@ public class JmsIO {
    * and configuration.
    */
   @AutoValue
-  public abstract static class Write extends PTransform<PCollection<String>, PDone> {
+  public abstract static class Write<EventT>
+      extends PTransform<PCollection<EventT>, WriteJmsResult<EventT>> {
 
     abstract @Nullable ConnectionFactory getConnectionFactory();
 
@@ -616,21 +625,31 @@ public class JmsIO {
 
     abstract @Nullable String getPassword();
 
-    abstract Builder builder();
+    abstract @Nullable SerializableBiFunction<EventT, Session, Message> getValueMapper();
+
+    abstract @Nullable SerializableFunction<EventT, String> getTopicNameMapper();
+
+    abstract Builder<EventT> builder();
 
     @AutoValue.Builder
-    abstract static class Builder {
-      abstract Builder setConnectionFactory(ConnectionFactory connectionFactory);
+    abstract static class Builder<EventT> {
+      abstract Builder<EventT> setConnectionFactory(ConnectionFactory connectionFactory);
+
+      abstract Builder<EventT> setQueue(String queue);
+
+      abstract Builder<EventT> setTopic(String topic);
 
-      abstract Builder setQueue(String queue);
+      abstract Builder<EventT> setUsername(String username);
 
-      abstract Builder setTopic(String topic);
+      abstract Builder<EventT> setPassword(String password);
 
-      abstract Builder setUsername(String username);
+      abstract Builder<EventT> setValueMapper(
+          SerializableBiFunction<EventT, Session, Message> valueMapper);
 
-      abstract Builder setPassword(String password);
+      abstract Builder<EventT> setTopicNameMapper(
+          SerializableFunction<EventT, String> topicNameMapper);
 
-      abstract Write build();
+      abstract Write<EventT> build();
     }
 
     /**
@@ -646,7 +665,7 @@ public class JmsIO {
      * @param connectionFactory The JMS {@link ConnectionFactory}.
      * @return The corresponding {@link JmsIO.Read}.
      */
-    public Write withConnectionFactory(ConnectionFactory connectionFactory) {
+    public Write<EventT> withConnectionFactory(ConnectionFactory connectionFactory) {
       checkArgument(connectionFactory != null, "connectionFactory can not be null");
       return builder().setConnectionFactory(connectionFactory).build();
     }
@@ -656,7 +675,7 @@ public class JmsIO {
      * acts as a producer on the queue.
      *
      * <p>This method is exclusive with {@link JmsIO.Write#withTopic(String)}. The user has to
-     * specify a destination: queue or topic.
+     * specify a destination: queue, topic, or topicNameMapper.
      *
      * <p>For instance:
      *
@@ -668,7 +687,7 @@ public class JmsIO {
      * @param queue The JMS queue name where to send messages to.
      * @return The corresponding {@link JmsIO.Read}.
      */
-    public Write withQueue(String queue) {
+    public Write<EventT> withQueue(String queue) {
       checkArgument(queue != null, "queue can not be null");
       return builder().setQueue(queue).build();
     }
@@ -678,7 +697,7 @@ public class JmsIO {
      * as a publisher on the topic.
      *
      * <p>This method is exclusive with {@link JmsIO.Write#withQueue(String)}. The user has to
-     * specify a destination: queue or topic.
+     * specify a destination: queue, topic, or topicNameMapper.
      *
      * <p>For instance:
      *
@@ -690,47 +709,131 @@ public class JmsIO {
      * @param topic The JMS topic name.
      * @return The corresponding {@link JmsIO.Read}.
      */
-    public Write withTopic(String topic) {
+    public Write<EventT> withTopic(String topic) {
       checkArgument(topic != null, "topic can not be null");
       return builder().setTopic(topic).build();
     }
 
     /** Define the username to connect to the JMS broker (authenticated). */
-    public Write withUsername(String username) {
+    public Write<EventT> withUsername(String username) {
       checkArgument(username != null, "username can not be null");
       return builder().setUsername(username).build();
     }
 
     /** Define the password to connect to the JMS broker (authenticated). */
-    public Write withPassword(String password) {
+    public Write<EventT> withPassword(String password) {
       checkArgument(password != null, "password can not be null");
       return builder().setPassword(password).build();
     }
 
+    /**
+     * Specify the JMS topic destination name where to send messages to dynamically. The {@link
+     * JmsIO.Write} acts as a publisher on the topic.
+     *
+     * <p>This method is exclusive with {@link JmsIO.Write#withQueue(String) and
+     * {@link JmsIO.Write#withTopic(String)}. The user has to specify a {@link SerializableFunction}
+     * that takes {@code EventT} object as a parameter, and returns the topic name depending of the content
+     * of the event object.
+     *
+     * <p>For example:
+     * <pre>{@code
+     * SerializableFunction<CompanyEvent, String> topicNameMapper =
+     *   (event ->
+     *    String.format(
+     *    "company/%s/employee/%s",
+     *    event.getCompanyName(),
+     *    event.getEmployeeId()));
+     * }</pre>
+     *
+     * <pre>{@code
+     * .apply(JmsIO.write().withTopicNameMapper(topicNameNapper)
+     * }</pre>
+     *
+     * @param topicNameMapper The function returning the dynamic topic name.
+     * @return The corresponding {@link JmsIO.Write}.
+     */
+    public Write<EventT> withTopicNameMapper(SerializableFunction<EventT, String> topicNameMapper) {
+      checkArgument(topicNameMapper != null, "topicNameMapper can not be null");
+      return builder().setTopicNameMapper(topicNameMapper).build();
+    }
+
+    /**
+     * Map the {@code EventT} object to a {@link javax.jms.Message}.
+     *
+     * <p>For instance:
+     *
+     * <pre>{@code
+     * SerializableBiFunction<SomeEventObject, Session, Message> valueMapper = (e, s) -> {
+     *
+     *       try {
+     *         TextMessage msg = s.createTextMessage();
+     *         msg.setText(Mapper.MAPPER.toJson(e));
+     *         return msg;
+     *       } catch (JMSException ex) {
+     *         throw new JmsIOException("Error!!", ex);
+     *       }
+     *     };
+     *
+     * }</pre>
+     *
+     * <pre>{@code
+     * .apply(JmsIO.write().withValueMapper(valueNapper)
+     * }</pre>
+     *
+     * @param valueMapper The function returning the {@link javax.jms.Message}
+     * @return The corresponding {@link JmsIO.Write}.
+     */
+    public Write<EventT> withValueMapper(
+        SerializableBiFunction<EventT, Session, Message> valueMapper) {
+      checkArgument(valueMapper != null, "valueMapper can not be null");
+      return builder().setValueMapper(valueMapper).build();
+    }
+
     @Override
-    public PDone expand(PCollection<String> input) {
+    public WriteJmsResult<EventT> expand(PCollection<EventT> input) {
       checkArgument(getConnectionFactory() != null, "withConnectionFactory() is required");
       checkArgument(
-          getQueue() != null || getTopic() != null,
-          "Either withQueue(queue) or withTopic(topic) is required");
+          getTopicNameMapper() != null || getQueue() != null || getTopic() != null,
+          "Either withTopicNameMapper(topicNameMapper), withQueue(queue), or withTopic(topic) is required");
+      boolean exclusiveTopicQueue = isExclusiveTopicQueue();
       checkArgument(
-          getQueue() == null || getTopic() == null,
-          "withQueue(queue) and withTopic(topic) are exclusive");
+          exclusiveTopicQueue,
+          "Only one of withQueue(queue), withTopic(topic), or withTopicNameMapper(function) must be set.");
+      checkArgument(getValueMapper() != null, "withValueMapper() is required");
+
+      final TupleTag<EventT> failedMessagesTag = new TupleTag<>();
+      final TupleTag<EventT> messagesTag = new TupleTag<>();
+      PCollectionTuple res =
+          input.apply(
+              ParDo.of(new WriterFn<>(this, failedMessagesTag))
+                  .withOutputTags(messagesTag, TupleTagList.of(failedMessagesTag)));
+      PCollection<EventT> failedMessages = res.get(failedMessagesTag).setCoder(input.getCoder());
+      res.get(messagesTag).setCoder(input.getCoder());
+      return WriteJmsResult.in(input.getPipeline(), failedMessagesTag, failedMessages);
+    }
 
-      input.apply(ParDo.of(new WriterFn(this)));
-      return PDone.in(input.getPipeline());
+    private boolean isExclusiveTopicQueue() {
+      boolean exclusiveTopicQueue =
+          Stream.of(getQueue() != null, getTopic() != null, getTopicNameMapper() != null)
+                  .filter(b -> b)
+                  .count()
+              == 1;
+      return exclusiveTopicQueue;
     }
 
-    private static class WriterFn extends DoFn<String, Void> {
+    private static class WriterFn<EventT> extends DoFn<EventT, EventT> {
 
-      private Write spec;
+      private Write<EventT> spec;
 
       private Connection connection;
       private Session session;
       private MessageProducer producer;
+      private Destination destination;
+      private final TupleTag<EventT> failedMessageTag;
 
-      public WriterFn(Write spec) {
+      public WriterFn(Write<EventT> spec, TupleTag<EventT> failedMessageTag) {
         this.spec = spec;
+        this.failedMessageTag = failedMessageTag;
       }
 
       @Setup
@@ -746,21 +849,31 @@ public class JmsIO {
           this.connection.start();
           // false means we don't use JMS transaction.
           this.session = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-          Destination destination;
+
           if (spec.getQueue() != null) {
-            destination = session.createQueue(spec.getQueue());
-          } else {
-            destination = session.createTopic(spec.getTopic());
+            this.destination = session.createQueue(spec.getQueue());
+          } else if (spec.getTopic() != null) {
+            this.destination = session.createTopic(spec.getTopic());
           }
-          this.producer = this.session.createProducer(destination);
+
+          this.producer = this.session.createProducer(null);
         }
       }
 
       @ProcessElement
-      public void processElement(ProcessContext ctx) throws Exception {
-        String value = ctx.element();
-        TextMessage message = session.createTextMessage(value);
-        producer.send(message);
+      public void processElement(ProcessContext ctx) {
+        Destination destinationToSendTo = destination;
+        try {
+          Message message = spec.getValueMapper().apply(ctx.element(), session);
+          if (spec.getTopicNameMapper() != null) {
+            destinationToSendTo =
+                session.createTopic(spec.getTopicNameMapper().apply(ctx.element()));
+          }
+          producer.send(destinationToSendTo, message);
+        } catch (Exception ex) {
+          LOG.error("Error sending message on topic {}", destinationToSendTo);
+          ctx.output(failedMessageTag, ctx.element());
+        }
       }
 
       @Teardown
diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIOException.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIOException.java
new file mode 100644
index 00000000000..1b19fd56979
--- /dev/null
+++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIOException.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.jms;
+
+public class JmsIOException extends RuntimeException {
+  public JmsIOException(String message) {
+    super(message);
+  }
+
+  public JmsIOException(String message, Throwable cause) {
+    super(message, cause);
+  }
+}
diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/TextMessageMapper.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/TextMessageMapper.java
new file mode 100644
index 00000000000..d5d85c46794
--- /dev/null
+++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/TextMessageMapper.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.jms;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import org.apache.beam.sdk.transforms.SerializableBiFunction;
+
+/**
+ * The TextMessageMapper takes a {@link String} value, a {@link javax.jms.Session} and returns a
+ * {@link javax.jms.TextMessage}.
+ */
+public class TextMessageMapper implements SerializableBiFunction<String, Session, Message> {
+
+  @Override
+  public Message apply(String value, Session session) {
+    try {
+      TextMessage msg = session.createTextMessage();
+      msg.setText(value);
+      return msg;
+    } catch (JMSException e) {
+      throw new JmsIOException("Error creating TextMessage", e);
+    }
+  }
+}
diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/WriteJmsResult.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/WriteJmsResult.java
new file mode 100644
index 00000000000..22034e60b0e
--- /dev/null
+++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/WriteJmsResult.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.jms;
+
+import java.util.Map;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+
+/**
+ * Return type of {@link JmsIO.Write} transform. All messages in error are identified by: -
+ * TupleTag<EventT> failedMessageTag - PCollection<EventT> failedMessages
+ */
+public class WriteJmsResult<EventT> implements POutput {
+
+  private final Pipeline pipeline;
+  private final TupleTag<EventT> failedMessageTag;
+  private final PCollection<EventT> failedMessages;
+
+  public WriteJmsResult(
+      Pipeline pipeline, TupleTag<EventT> failedMessageTag, PCollection<EventT> failedMessages) {
+    this.pipeline = pipeline;
+    this.failedMessageTag = failedMessageTag;
+    this.failedMessages = failedMessages;
+  }
+
+  static <FailevtT> WriteJmsResult<FailevtT> in(
+      Pipeline pipeline,
+      TupleTag<FailevtT> failedMessageTag,
+      PCollection<FailevtT> failedMessages) {
+    return new WriteJmsResult<FailevtT>(pipeline, failedMessageTag, failedMessages);
+  }
+
+  @Override
+  public Map<TupleTag<?>, PValue> expand() {
+    return ImmutableMap.of(failedMessageTag, failedMessages);
+  }
+
+  @Override
+  public Pipeline getPipeline() {
+    return pipeline;
+  }
+
+  public PCollection<EventT> getFailedMessages() {
+    return failedMessages;
+  }
+
+  @Override
+  public void finishSpecifyingOutput(
+      String transformName, PInput input, PTransform<?, ?> transform) {}
+}
diff --git a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java
index a9f3c3f004e..2fc816b0f59 100644
--- a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java
+++ b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java
@@ -30,6 +30,7 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import java.io.IOException;
+import java.io.Serializable;
 import java.lang.reflect.Proxy;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
@@ -39,6 +40,7 @@ import java.util.function.Function;
 import javax.jms.BytesMessage;
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
@@ -62,6 +64,7 @@ import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.SerializableBiFunction;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Throwables;
 import org.junit.After;
@@ -236,8 +239,9 @@ public class JmsIOTest {
     pipeline
         .apply(Create.of(data))
         .apply(
-            JmsIO.write()
+            JmsIO.<String>write()
                 .withConnectionFactory(connectionFactory)
+                .withValueMapper(new TextMessageMapper())
                 .withQueue(QUEUE)
                 .withUsername(USERNAME)
                 .withPassword(PASSWORD));
@@ -255,6 +259,87 @@ public class JmsIOTest {
     assertEquals(100, count);
   }
 
+  @Test
+  public void testWriteMessageWithError() throws Exception {
+    ArrayList<String> data = new ArrayList<>();
+    for (int i = 0; i < 100; i++) {
+      data.add("Message " + i);
+    }
+
+    WriteJmsResult<String> output =
+        pipeline
+            .apply(Create.of(data))
+            .apply(
+                JmsIO.<String>write()
+                    .withConnectionFactory(connectionFactory)
+                    .withValueMapper(new TextMessageMapperWithError())
+                    .withQueue(QUEUE)
+                    .withUsername(USERNAME)
+                    .withPassword(PASSWORD));
+
+    PAssert.that(output.getFailedMessages()).containsInAnyOrder("Message 1", "Message 2");
+
+    pipeline.run();
+
+    Connection connection = connectionFactory.createConnection(USERNAME, PASSWORD);
+    connection.start();
+    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+    MessageConsumer consumer = session.createConsumer(session.createQueue(QUEUE));
+    int count = 0;
+    while (consumer.receive(1000) != null) {
+      count++;
+    }
+    assertEquals(98, count);
+  }
+
+  @Test
+  public void testWriteDynamicMessage() throws Exception {
+    Connection connection = connectionFactory.createConnection(USERNAME, PASSWORD);
+    connection.start();
+    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+    MessageConsumer consumerOne = session.createConsumer(session.createTopic("Topic_One"));
+    MessageConsumer consumerTwo = session.createConsumer(session.createTopic("Topic_Two"));
+    ArrayList<TestEvent> data = new ArrayList<>();
+    for (int i = 0; i < 50; i++) {
+      data.add(new TestEvent("Topic_One", "Message One " + i));
+    }
+    for (int i = 0; i < 100; i++) {
+      data.add(new TestEvent("Topic_Two", "Message Two " + i));
+    }
+    pipeline
+        .apply(Create.of(data))
+        .apply(
+            JmsIO.<TestEvent>write()
+                .withConnectionFactory(connectionFactory)
+                .withUsername(USERNAME)
+                .withPassword(PASSWORD)
+                .withTopicNameMapper(e -> e.getTopicName())
+                .withValueMapper(
+                    (e, s) -> {
+                      try {
+                        TextMessage msg = s.createTextMessage();
+                        msg.setText(e.getValue());
+                        return msg;
+                      } catch (JMSException ex) {
+                        throw new JmsIOException("Error writing TextMessage", ex);
+                      }
+                    }));
+
+    pipeline.run();
+
+    int count = 0;
+    while (consumerOne.receive(1000) != null) {
+      count++;
+    }
+    assertEquals(50, count);
+
+    count = 0;
+    while (consumerTwo.receive(1000) != null) {
+      count++;
+    }
+    assertEquals(100, count);
+  }
+
   @Test
   public void testSplitForQueue() throws Exception {
     JmsIO.Read read = JmsIO.read().withQueue(QUEUE);
@@ -555,4 +640,39 @@ public class JmsIOTest {
               return result;
             });
   }
+
+  private static class TestEvent implements Serializable {
+    private final String topicName;
+    private final String value;
+
+    private TestEvent(String topicName, String value) {
+      this.topicName = topicName;
+      this.value = value;
+    }
+
+    private String getTopicName() {
+      return this.topicName;
+    }
+
+    private String getValue() {
+      return this.value;
+    }
+  }
+
+  private static class TextMessageMapperWithError
+      implements SerializableBiFunction<String, Session, Message> {
+    @Override
+    public Message apply(String value, Session session) {
+      try {
+        if (value.equals("Message 1") || value.equals("Message 2")) {
+          throw new JMSException("Error!!");
+        }
+        TextMessage msg = session.createTextMessage();
+        msg.setText(value);
+        return msg;
+      } catch (JMSException e) {
+        throw new JmsIOException("Error creating TextMessage", e);
+      }
+    }
+  }
 }