You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2022/04/25 23:20:31 UTC
[beam] branch master updated: [BEAM-13608] JmsIO dynamic topics feature (#17163)
This is an automated email from the ASF dual-hosted git repository.
lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new a936ef88985 [BEAM-13608] JmsIO dynamic topics feature (#17163)
a936ef88985 is described below
commit a936ef889859f0c93682c2480a68487ae53c03fe
Author: rvballada <vi...@renault.com>
AuthorDate: Tue Apr 26 01:20:22 2022 +0200
[BEAM-13608] JmsIO dynamic topics feature (#17163)
* [BEAM-13608] => First Dynamic Topic management implementation
* [BEAM-13608] => Changes after design document review:
- Remove writeDynamic method, write is now parameterized,
- Remove via method replaced by withTopicNameMapper and withValueMapper.
* Add error management in WriteJmsResult
* Write Unit Tests for WriteJmsResult
* Rename SerializableMapper into SerializableMessageMapper
* Update sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
Co-authored-by: Lukasz Cwik <lc...@google.com>
* Update sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
Co-authored-by: Lukasz Cwik <lc...@google.com>
* Update sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
Code review
Co-authored-by: Lukasz Cwik <lc...@google.com>
* Update sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/TextMessageMapper.java
Code review
Co-authored-by: Lukasz Cwik <lc...@google.com>
* Update sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java
Co-authored-by: Lukasz Cwik <lc...@google.com>
* Update sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java
Co-authored-by: Lukasz Cwik <lc...@google.com>
* [BEAM-13608] => Upgrade comments and code reviex
* Update sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java
Co-authored-by: Lukasz Cwik <lc...@google.com>
* Update sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java
Co-authored-by: Lukasz Cwik <lc...@google.com>
* [BEAM-13608] => Upgrade CHANGES.ms
* Update sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
Co-authored-by: Lukasz Cwik <lc...@google.com>
* Update sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
Co-authored-by: Lukasz Cwik <lc...@google.com>
* Update sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
Co-authored-by: Lukasz Cwik <lc...@google.com>
* Update sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
Co-authored-by: Lukasz Cwik <lc...@google.com>
* Update sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
Co-authored-by: Lukasz Cwik <lc...@google.com>
* Update sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
Co-authored-by: Lukasz Cwik <lc...@google.com>
* [BEAM-13608] => Delete SerializableMessageMapper and replace it with a SerializableBiFunction
* [BEAM-13608] => Remove withCoder and use input.getCoder
* [BEAM-13608] => Comment formatting
* [BEAM-13608] => Update comments and documentation
* [BEAM-13608] => Fix format violation
* Apply suggestions from code review
Co-authored-by: Lukasz Cwik <lc...@google.com>
---
CHANGES.md | 14 +-
.../java/org/apache/beam/sdk/io/jms/JmsIO.java | 191 ++++++++++++++++-----
.../org/apache/beam/sdk/io/jms/JmsIOException.java | 28 +++
.../apache/beam/sdk/io/jms/TextMessageMapper.java | 42 +++++
.../org/apache/beam/sdk/io/jms/WriteJmsResult.java | 71 ++++++++
.../java/org/apache/beam/sdk/io/jms/JmsIOTest.java | 122 ++++++++++++-
6 files changed, 425 insertions(+), 43 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index fe4ad04e3c5..9737f8a0b2f 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -49,7 +49,7 @@
## Known Issues
* ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
--->
+
# [2.39.0] - Unreleased
## Highlights
@@ -59,16 +59,24 @@
## I/Os
-* Support for X source added (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
+* JmsIO gains the ability to map any kind of input to any subclass of `javax.jms.Message` (Java) ([BEAM-16308](https://issues.apache.org/jira/browse/BEAM-16308)).
+* JmsIO introduces the ability to write to dynamic topics (Java) ([BEAM-16308](https://issues.apache.org/jira/browse/BEAM-16308)).
+ * A `topicNameMapper` must be set to extract the topic name from the input value.
+ * A `valueMapper` must be set to convert the input value to JMS message.
## New Features / Improvements
-* X feature added (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
* 'Manage Clusters' JupyterLab extension added for users to configure usage of Dataproc clusters managed by Interactive Beam (Python) ([BEAM-14130](https://issues.apache.org/jira/browse/BEAM-14130)).
## Breaking Changes
* Unused functions `ShallowCloneParDoPayload()`, `ShallowCloneSideInput()`, and `ShallowCloneFunctionSpec()` have been removed from the Go SDK's pipelinex package ([BEAM-13739](https://issues.apache.org/jira/browse/BEAM-13739)).
+* JmsIO requires an explicit `valueMapper` to be set ([BEAM-16308](https://issues.apache.org/jira/browse/BEAM-16308)). You can use the `TextMessageMapper` to convert `String` inputs to JMS `TestMessage`s:
+```java
+ JmsIO.<String>write()
+ .withConnectionFactory(jmsConnectionFactory)
+ .withValueMapper(new TextMessageMapper());
+```
## Deprecations
diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
index 9fa4492cf23..d5460b48bbb 100644
--- a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
+++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
@@ -28,6 +28,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
+import java.util.stream.Stream;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
@@ -48,13 +49,19 @@ import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableBiFunction;
+import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;
import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* An unbounded source for JMS destinations (queues or topics).
@@ -107,7 +114,6 @@ import org.joda.time.Instant;
* .apply(JmsIO.write()
* .withConnectionFactory(myConnectionFactory)
* .withQueue("my-queue")
- *
* }</pre>
*/
@Experimental(Kind.SOURCE_SINK)
@@ -116,6 +122,8 @@ import org.joda.time.Instant;
})
public class JmsIO {
+ private static final Logger LOG = LoggerFactory.getLogger(JmsIO.class);
+
public static Read<JmsRecord> read() {
return new AutoValue_JmsIO_Read.Builder<JmsRecord>()
.setMaxNumRecords(Long.MAX_VALUE)
@@ -157,8 +165,8 @@ public class JmsIO {
return new AutoValue_JmsIO_Read.Builder<T>().setMaxNumRecords(Long.MAX_VALUE).build();
}
- public static Write write() {
- return new AutoValue_JmsIO_Write.Builder().build();
+ public static <EventT> Write<EventT> write() {
+ return new AutoValue_JmsIO_Write.Builder<EventT>().build();
}
/**
@@ -604,7 +612,8 @@ public class JmsIO {
* and configuration.
*/
@AutoValue
- public abstract static class Write extends PTransform<PCollection<String>, PDone> {
+ public abstract static class Write<EventT>
+ extends PTransform<PCollection<EventT>, WriteJmsResult<EventT>> {
abstract @Nullable ConnectionFactory getConnectionFactory();
@@ -616,21 +625,31 @@ public class JmsIO {
abstract @Nullable String getPassword();
- abstract Builder builder();
+ abstract @Nullable SerializableBiFunction<EventT, Session, Message> getValueMapper();
+
+ abstract @Nullable SerializableFunction<EventT, String> getTopicNameMapper();
+
+ abstract Builder<EventT> builder();
@AutoValue.Builder
- abstract static class Builder {
- abstract Builder setConnectionFactory(ConnectionFactory connectionFactory);
+ abstract static class Builder<EventT> {
+ abstract Builder<EventT> setConnectionFactory(ConnectionFactory connectionFactory);
+
+ abstract Builder<EventT> setQueue(String queue);
+
+ abstract Builder<EventT> setTopic(String topic);
- abstract Builder setQueue(String queue);
+ abstract Builder<EventT> setUsername(String username);
- abstract Builder setTopic(String topic);
+ abstract Builder<EventT> setPassword(String password);
- abstract Builder setUsername(String username);
+ abstract Builder<EventT> setValueMapper(
+ SerializableBiFunction<EventT, Session, Message> valueMapper);
- abstract Builder setPassword(String password);
+ abstract Builder<EventT> setTopicNameMapper(
+ SerializableFunction<EventT, String> topicNameMapper);
- abstract Write build();
+ abstract Write<EventT> build();
}
/**
@@ -646,7 +665,7 @@ public class JmsIO {
* @param connectionFactory The JMS {@link ConnectionFactory}.
* @return The corresponding {@link JmsIO.Read}.
*/
- public Write withConnectionFactory(ConnectionFactory connectionFactory) {
+ public Write<EventT> withConnectionFactory(ConnectionFactory connectionFactory) {
checkArgument(connectionFactory != null, "connectionFactory can not be null");
return builder().setConnectionFactory(connectionFactory).build();
}
@@ -656,7 +675,7 @@ public class JmsIO {
* acts as a producer on the queue.
*
* <p>This method is exclusive with {@link JmsIO.Write#withTopic(String)}. The user has to
- * specify a destination: queue or topic.
+ * specify a destination: queue, topic, or topicNameMapper.
*
* <p>For instance:
*
@@ -668,7 +687,7 @@ public class JmsIO {
* @param queue The JMS queue name where to send messages to.
* @return The corresponding {@link JmsIO.Read}.
*/
- public Write withQueue(String queue) {
+ public Write<EventT> withQueue(String queue) {
checkArgument(queue != null, "queue can not be null");
return builder().setQueue(queue).build();
}
@@ -678,7 +697,7 @@ public class JmsIO {
* as a publisher on the topic.
*
* <p>This method is exclusive with {@link JmsIO.Write#withQueue(String)}. The user has to
- * specify a destination: queue or topic.
+ * specify a destination: queue, topic, or topicNameMapper.
*
* <p>For instance:
*
@@ -690,47 +709,131 @@ public class JmsIO {
* @param topic The JMS topic name.
* @return The corresponding {@link JmsIO.Read}.
*/
- public Write withTopic(String topic) {
+ public Write<EventT> withTopic(String topic) {
checkArgument(topic != null, "topic can not be null");
return builder().setTopic(topic).build();
}
/** Define the username to connect to the JMS broker (authenticated). */
- public Write withUsername(String username) {
+ public Write<EventT> withUsername(String username) {
checkArgument(username != null, "username can not be null");
return builder().setUsername(username).build();
}
/** Define the password to connect to the JMS broker (authenticated). */
- public Write withPassword(String password) {
+ public Write<EventT> withPassword(String password) {
checkArgument(password != null, "password can not be null");
return builder().setPassword(password).build();
}
+ /**
+ * Specify the JMS topic destination name where to send messages to dynamically. The {@link
+ * JmsIO.Write} acts as a publisher on the topic.
+ *
+ * <p>This method is exclusive with {@link JmsIO.Write#withQueue(String) and
+ * {@link JmsIO.Write#withTopic(String)}. The user has to specify a {@link SerializableFunction}
+ * that takes {@code EventT} object as a parameter, and returns the topic name depending of the content
+ * of the event object.
+ *
+ * <p>For example:
+ * <pre>{@code
+ * SerializableFunction<CompanyEvent, String> topicNameMapper =
+ * (event ->
+ * String.format(
+ * "company/%s/employee/%s",
+ * event.getCompanyName(),
+ * event.getEmployeeId()));
+ * }</pre>
+ *
+ * <pre>{@code
+ * .apply(JmsIO.write().withTopicNameMapper(topicNameNapper)
+ * }</pre>
+ *
+ * @param topicNameMapper The function returning the dynamic topic name.
+ * @return The corresponding {@link JmsIO.Write}.
+ */
+ public Write<EventT> withTopicNameMapper(SerializableFunction<EventT, String> topicNameMapper) {
+ checkArgument(topicNameMapper != null, "topicNameMapper can not be null");
+ return builder().setTopicNameMapper(topicNameMapper).build();
+ }
+
+ /**
+ * Map the {@code EventT} object to a {@link javax.jms.Message}.
+ *
+ * <p>For instance:
+ *
+ * <pre>{@code
+ * SerializableBiFunction<SomeEventObject, Session, Message> valueMapper = (e, s) -> {
+ *
+ * try {
+ * TextMessage msg = s.createTextMessage();
+ * msg.setText(Mapper.MAPPER.toJson(e));
+ * return msg;
+ * } catch (JMSException ex) {
+ * throw new JmsIOException("Error!!", ex);
+ * }
+ * };
+ *
+ * }</pre>
+ *
+ * <pre>{@code
+ * .apply(JmsIO.write().withValueMapper(valueNapper)
+ * }</pre>
+ *
+ * @param valueMapper The function returning the {@link javax.jms.Message}
+ * @return The corresponding {@link JmsIO.Write}.
+ */
+ public Write<EventT> withValueMapper(
+ SerializableBiFunction<EventT, Session, Message> valueMapper) {
+ checkArgument(valueMapper != null, "valueMapper can not be null");
+ return builder().setValueMapper(valueMapper).build();
+ }
+
@Override
- public PDone expand(PCollection<String> input) {
+ public WriteJmsResult<EventT> expand(PCollection<EventT> input) {
checkArgument(getConnectionFactory() != null, "withConnectionFactory() is required");
checkArgument(
- getQueue() != null || getTopic() != null,
- "Either withQueue(queue) or withTopic(topic) is required");
+ getTopicNameMapper() != null || getQueue() != null || getTopic() != null,
+ "Either withTopicNameMapper(topicNameMapper), withQueue(queue), or withTopic(topic) is required");
+ boolean exclusiveTopicQueue = isExclusiveTopicQueue();
checkArgument(
- getQueue() == null || getTopic() == null,
- "withQueue(queue) and withTopic(topic) are exclusive");
+ exclusiveTopicQueue,
+ "Only one of withQueue(queue), withTopic(topic), or withTopicNameMapper(function) must be set.");
+ checkArgument(getValueMapper() != null, "withValueMapper() is required");
+
+ final TupleTag<EventT> failedMessagesTag = new TupleTag<>();
+ final TupleTag<EventT> messagesTag = new TupleTag<>();
+ PCollectionTuple res =
+ input.apply(
+ ParDo.of(new WriterFn<>(this, failedMessagesTag))
+ .withOutputTags(messagesTag, TupleTagList.of(failedMessagesTag)));
+ PCollection<EventT> failedMessages = res.get(failedMessagesTag).setCoder(input.getCoder());
+ res.get(messagesTag).setCoder(input.getCoder());
+ return WriteJmsResult.in(input.getPipeline(), failedMessagesTag, failedMessages);
+ }
- input.apply(ParDo.of(new WriterFn(this)));
- return PDone.in(input.getPipeline());
+ private boolean isExclusiveTopicQueue() {
+ boolean exclusiveTopicQueue =
+ Stream.of(getQueue() != null, getTopic() != null, getTopicNameMapper() != null)
+ .filter(b -> b)
+ .count()
+ == 1;
+ return exclusiveTopicQueue;
}
- private static class WriterFn extends DoFn<String, Void> {
+ private static class WriterFn<EventT> extends DoFn<EventT, EventT> {
- private Write spec;
+ private Write<EventT> spec;
private Connection connection;
private Session session;
private MessageProducer producer;
+ private Destination destination;
+ private final TupleTag<EventT> failedMessageTag;
- public WriterFn(Write spec) {
+ public WriterFn(Write<EventT> spec, TupleTag<EventT> failedMessageTag) {
this.spec = spec;
+ this.failedMessageTag = failedMessageTag;
}
@Setup
@@ -746,21 +849,31 @@ public class JmsIO {
this.connection.start();
// false means we don't use JMS transaction.
this.session = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Destination destination;
+
if (spec.getQueue() != null) {
- destination = session.createQueue(spec.getQueue());
- } else {
- destination = session.createTopic(spec.getTopic());
+ this.destination = session.createQueue(spec.getQueue());
+ } else if (spec.getTopic() != null) {
+ this.destination = session.createTopic(spec.getTopic());
}
- this.producer = this.session.createProducer(destination);
+
+ this.producer = this.session.createProducer(null);
}
}
@ProcessElement
- public void processElement(ProcessContext ctx) throws Exception {
- String value = ctx.element();
- TextMessage message = session.createTextMessage(value);
- producer.send(message);
+ public void processElement(ProcessContext ctx) {
+ Destination destinationToSendTo = destination;
+ try {
+ Message message = spec.getValueMapper().apply(ctx.element(), session);
+ if (spec.getTopicNameMapper() != null) {
+ destinationToSendTo =
+ session.createTopic(spec.getTopicNameMapper().apply(ctx.element()));
+ }
+ producer.send(destinationToSendTo, message);
+ } catch (Exception ex) {
+ LOG.error("Error sending message on topic {}", destinationToSendTo);
+ ctx.output(failedMessageTag, ctx.element());
+ }
}
@Teardown
diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIOException.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIOException.java
new file mode 100644
index 00000000000..1b19fd56979
--- /dev/null
+++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIOException.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.jms;
+
+public class JmsIOException extends RuntimeException {
+ public JmsIOException(String message) {
+ super(message);
+ }
+
+ public JmsIOException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/TextMessageMapper.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/TextMessageMapper.java
new file mode 100644
index 00000000000..d5d85c46794
--- /dev/null
+++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/TextMessageMapper.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.jms;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import org.apache.beam.sdk.transforms.SerializableBiFunction;
+
+/**
+ * The TextMessageMapper takes a {@link String} value, a {@link javax.jms.Session} and returns a
+ * {@link javax.jms.TextMessage}.
+ */
+public class TextMessageMapper implements SerializableBiFunction<String, Session, Message> {
+
+ @Override
+ public Message apply(String value, Session session) {
+ try {
+ TextMessage msg = session.createTextMessage();
+ msg.setText(value);
+ return msg;
+ } catch (JMSException e) {
+ throw new JmsIOException("Error creating TextMessage", e);
+ }
+ }
+}
diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/WriteJmsResult.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/WriteJmsResult.java
new file mode 100644
index 00000000000..22034e60b0e
--- /dev/null
+++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/WriteJmsResult.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.jms;
+
+import java.util.Map;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+
+/**
+ * Return type of {@link JmsIO.Write} transform. All messages in error are identified by: -
+ * TupleTag<EventT> failedMessageTag - PCollection<EventT> failedMessages
+ */
+public class WriteJmsResult<EventT> implements POutput {
+
+ private final Pipeline pipeline;
+ private final TupleTag<EventT> failedMessageTag;
+ private final PCollection<EventT> failedMessages;
+
+ public WriteJmsResult(
+ Pipeline pipeline, TupleTag<EventT> failedMessageTag, PCollection<EventT> failedMessages) {
+ this.pipeline = pipeline;
+ this.failedMessageTag = failedMessageTag;
+ this.failedMessages = failedMessages;
+ }
+
+ static <FailevtT> WriteJmsResult<FailevtT> in(
+ Pipeline pipeline,
+ TupleTag<FailevtT> failedMessageTag,
+ PCollection<FailevtT> failedMessages) {
+ return new WriteJmsResult<FailevtT>(pipeline, failedMessageTag, failedMessages);
+ }
+
+ @Override
+ public Map<TupleTag<?>, PValue> expand() {
+ return ImmutableMap.of(failedMessageTag, failedMessages);
+ }
+
+ @Override
+ public Pipeline getPipeline() {
+ return pipeline;
+ }
+
+ public PCollection<EventT> getFailedMessages() {
+ return failedMessages;
+ }
+
+ @Override
+ public void finishSpecifyingOutput(
+ String transformName, PInput input, PTransform<?, ?> transform) {}
+}
diff --git a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java
index a9f3c3f004e..2fc816b0f59 100644
--- a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java
+++ b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java
@@ -30,6 +30,7 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.IOException;
+import java.io.Serializable;
import java.lang.reflect.Proxy;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
@@ -39,6 +40,7 @@ import java.util.function.Function;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
@@ -62,6 +64,7 @@ import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.SerializableBiFunction;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Throwables;
import org.junit.After;
@@ -236,8 +239,9 @@ public class JmsIOTest {
pipeline
.apply(Create.of(data))
.apply(
- JmsIO.write()
+ JmsIO.<String>write()
.withConnectionFactory(connectionFactory)
+ .withValueMapper(new TextMessageMapper())
.withQueue(QUEUE)
.withUsername(USERNAME)
.withPassword(PASSWORD));
@@ -255,6 +259,87 @@ public class JmsIOTest {
assertEquals(100, count);
}
+ @Test
+ public void testWriteMessageWithError() throws Exception {
+ ArrayList<String> data = new ArrayList<>();
+ for (int i = 0; i < 100; i++) {
+ data.add("Message " + i);
+ }
+
+ WriteJmsResult<String> output =
+ pipeline
+ .apply(Create.of(data))
+ .apply(
+ JmsIO.<String>write()
+ .withConnectionFactory(connectionFactory)
+ .withValueMapper(new TextMessageMapperWithError())
+ .withQueue(QUEUE)
+ .withUsername(USERNAME)
+ .withPassword(PASSWORD));
+
+ PAssert.that(output.getFailedMessages()).containsInAnyOrder("Message 1", "Message 2");
+
+ pipeline.run();
+
+ Connection connection = connectionFactory.createConnection(USERNAME, PASSWORD);
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = session.createConsumer(session.createQueue(QUEUE));
+ int count = 0;
+ while (consumer.receive(1000) != null) {
+ count++;
+ }
+ assertEquals(98, count);
+ }
+
+ @Test
+ public void testWriteDynamicMessage() throws Exception {
+ Connection connection = connectionFactory.createConnection(USERNAME, PASSWORD);
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumerOne = session.createConsumer(session.createTopic("Topic_One"));
+ MessageConsumer consumerTwo = session.createConsumer(session.createTopic("Topic_Two"));
+ ArrayList<TestEvent> data = new ArrayList<>();
+ for (int i = 0; i < 50; i++) {
+ data.add(new TestEvent("Topic_One", "Message One " + i));
+ }
+ for (int i = 0; i < 100; i++) {
+ data.add(new TestEvent("Topic_Two", "Message Two " + i));
+ }
+ pipeline
+ .apply(Create.of(data))
+ .apply(
+ JmsIO.<TestEvent>write()
+ .withConnectionFactory(connectionFactory)
+ .withUsername(USERNAME)
+ .withPassword(PASSWORD)
+ .withTopicNameMapper(e -> e.getTopicName())
+ .withValueMapper(
+ (e, s) -> {
+ try {
+ TextMessage msg = s.createTextMessage();
+ msg.setText(e.getValue());
+ return msg;
+ } catch (JMSException ex) {
+ throw new JmsIOException("Error writing TextMessage", ex);
+ }
+ }));
+
+ pipeline.run();
+
+ int count = 0;
+ while (consumerOne.receive(1000) != null) {
+ count++;
+ }
+ assertEquals(50, count);
+
+ count = 0;
+ while (consumerTwo.receive(1000) != null) {
+ count++;
+ }
+ assertEquals(100, count);
+ }
+
@Test
public void testSplitForQueue() throws Exception {
JmsIO.Read read = JmsIO.read().withQueue(QUEUE);
@@ -555,4 +640,39 @@ public class JmsIOTest {
return result;
});
}
+
+ private static class TestEvent implements Serializable {
+ private final String topicName;
+ private final String value;
+
+ private TestEvent(String topicName, String value) {
+ this.topicName = topicName;
+ this.value = value;
+ }
+
+ private String getTopicName() {
+ return this.topicName;
+ }
+
+ private String getValue() {
+ return this.value;
+ }
+ }
+
+ private static class TextMessageMapperWithError
+ implements SerializableBiFunction<String, Session, Message> {
+ @Override
+ public Message apply(String value, Session session) {
+ try {
+ if (value.equals("Message 1") || value.equals("Message 2")) {
+ throw new JMSException("Error!!");
+ }
+ TextMessage msg = session.createTextMessage();
+ msg.setText(value);
+ return msg;
+ } catch (JMSException e) {
+ throw new JmsIOException("Error creating TextMessage", e);
+ }
+ }
+ }
}