You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/12/21 20:23:34 UTC
[26/50] incubator-beam git commit: [BEAM-716] Use AutoValue in JmsIO
[BEAM-716] Use AutoValue in JmsIO
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/caf1c720
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/caf1c720
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/caf1c720
Branch: refs/heads/gearpump-runner
Commit: caf1c720f66de4d502f79b6c11c64b49c53329b0
Parents: 1c9bf8d
Author: Jean-Baptiste Onofr� <jb...@apache.org>
Authored: Sun Dec 11 07:43:41 2016 +0100
Committer: Jean-Baptiste Onofr� <jb...@apache.org>
Committed: Mon Dec 19 07:24:00 2016 +0100
----------------------------------------------------------------------
sdks/java/io/jms/pom.xml | 7 +
.../java/org/apache/beam/sdk/io/jms/JmsIO.java | 321 +++++++++++++------
2 files changed, 228 insertions(+), 100 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/caf1c720/sdks/java/io/jms/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/jms/pom.xml b/sdks/java/io/jms/pom.xml
index bca0152..b88254e 100644
--- a/sdks/java/io/jms/pom.xml
+++ b/sdks/java/io/jms/pom.xml
@@ -81,6 +81,13 @@
<artifactId>jsr305</artifactId>
</dependency>
+ <!-- compile dependencies -->
+ <dependency>
+ <groupId>com.google.auto.value</groupId>
+ <artifactId>auto-value</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
<!-- test dependencies -->
<dependency>
<groupId>org.apache.activemq</groupId>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/caf1c720/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
index 24fa67d..76dee67 100644
--- a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
+++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.io.jms;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
+import com.google.auto.value.AutoValue;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.util.ArrayList;
@@ -101,37 +102,148 @@ public class JmsIO {
private static final Logger LOG = LoggerFactory.getLogger(JmsIO.class);
public static Read read() {
- return new Read(null, null, null, Long.MAX_VALUE, null);
+ return new AutoValue_JmsIO_Read.Builder().setMaxNumRecords(Long.MAX_VALUE).build();
}
public static Write write() {
- return new Write(null, null, null);
+ return new AutoValue_JmsIO_Write.Builder().build();
}
/**
* A {@link PTransform} to read from a JMS destination. See {@link JmsIO} for more
* information on usage and configuration.
*/
- public static class Read extends PTransform<PBegin, PCollection<JmsRecord>> {
+ @AutoValue
+ public abstract static class Read extends PTransform<PBegin, PCollection<JmsRecord>> {
+ /**
+ * NB: According to http://docs.oracle.com/javaee/1.4/api/javax/jms/ConnectionFactory.html
+ * "It is expected that JMS providers will provide the tools an administrator needs to create
+ * and configure administered objects in a JNDI namespace. JMS provider implementations of
+ * administered objects should be both javax.jndi.Referenceable and java.io.Serializable so
+ * that they can be stored in all JNDI naming contexts. In addition, it is recommended that
+ * these implementations follow the JavaBeansTM design patterns."
+ *
+ * <p>So, a {@link ConnectionFactory} implementation is serializable.
+ */
+ @Nullable abstract ConnectionFactory getConnectionFactory();
+ @Nullable abstract String getQueue();
+ @Nullable abstract String getTopic();
+ abstract long getMaxNumRecords();
+ @Nullable abstract Duration getMaxReadTime();
+
+ abstract Builder builder();
+
+ @AutoValue.Builder
+ abstract static class Builder {
+ abstract Builder setConnectionFactory(ConnectionFactory connectionFactory);
+ abstract Builder setQueue(String queue);
+ abstract Builder setTopic(String topic);
+ abstract Builder setMaxNumRecords(long maxNumRecords);
+ abstract Builder setMaxReadTime(Duration maxReadTime);
+ abstract Read build();
+ }
+
+ /**
+ * <p>Specify the JMS connection factory to connect to the JMS broker.
+ *
+ * <p>For instance:
+ *
+ * <pre>
+ * {@code
+ * pipeline.apply(JmsIO.read().withConnectionFactory(myConnectionFactory)
+ * }
+ * </pre>
+ *
+ * @param connectionFactory The JMS {@link ConnectionFactory}.
+ * @return The corresponding {@link JmsIO.Read}.
+ */
public Read withConnectionFactory(ConnectionFactory connectionFactory) {
- return new Read(connectionFactory, queue, topic, maxNumRecords, maxReadTime);
+ return builder().setConnectionFactory(connectionFactory).build();
}
+ /**
+ * <p>Specify the JMS queue destination name where to read messages from. The
+ * {@link JmsIO.Read} acts as a consumer on the queue.
+ *
+ * <p>This method is exclusive with {@link JmsIO.Read#withTopic(String)}. The user has to
+ * specify a destination: queue or topic.
+ *
+ * <p>For instance:
+ *
+ * <pre>
+ * {@code
+ * pipeline.apply(JmsIO.read().withQueue("my-queue")
+ * }
+ * </pre>
+ *
+ * @param queue The JMS queue name where to read messages from.
+ * @return The corresponding {@link JmsIO.Read}.
+ */
public Read withQueue(String queue) {
- return new Read(connectionFactory, queue, topic, maxNumRecords, maxReadTime);
+ return builder().setQueue(queue).build();
}
+ /**
+ * <p>Specify the JMS topic destination name where to receive messages from. The
+ * {@link JmsIO.Read} acts as a subscriber on the topic.
+ *
+ * <p>This method is exclusive with {@link JmsIO.Read#withQueue(String)}. The user has to
+ * specify a destination: queue or topic.
+ *
+ * <p>For instance:
+ *
+ * <pre>
+ * {@code
+ * pipeline.apply(JmsIO.read().withTopic("my-topic")
+ * }
+ * </pre>
+ *
+ * @param topic The JMS topic name.
+ * @return The corresponding {@link JmsIO.Read}.
+ */
public Read withTopic(String topic) {
- return new Read(connectionFactory, queue, topic, maxNumRecords, maxReadTime);
+ return builder().setTopic(topic).build();
}
+ /**
+ * <p>Define the max number of records that the source will read. Using a max number of records
+ * different from {@code Long.MAX_VALUE} means the source will be {@code Bounded}, and will
+ * stop once the max number of records read is reached.
+ *
+ * <p>For instance:
+ *
+ * <pre>
+ * {@code
+ * pipeline.apply(JmsIO.read().withNumRecords(1000)
+ * }
+ * </pre>
+ *
+ * @param maxNumRecords The max number of records to read from the JMS destination.
+ * @return The corresponding {@link JmsIO.Read}.
+ */
public Read withMaxNumRecords(long maxNumRecords) {
- return new Read(connectionFactory, queue, topic, maxNumRecords, maxReadTime);
+ return builder().setMaxNumRecords(maxNumRecords).build();
}
+ /**
+ * <p>Define the max read time that the source will read. Using a non null max read time
+ * duration means the source will be {@code Bounded}, and will stop once the max read time is
+ * reached.
+ *
+ * <p>For instance:
+ *
+ * <pre>
+ * {@code
+ * pipeline.apply(JmsIO.read().withMaxReadTime(Duration.minutes(10))
+ * }
+ * </pre>
+ *
+ * @param maxReadTime The max read time duration.
+ * @return The corresponding {@link JmsIO.Read}.
+ */
public Read withMaxReadTime(Duration maxReadTime) {
- return new Read(connectionFactory, queue, topic, maxNumRecords, maxReadTime);
+ return builder().setMaxReadTime(maxReadTime).build();
}
@Override
@@ -141,10 +253,10 @@ public class JmsIO {
PTransform<PBegin, PCollection<JmsRecord>> transform = unbounded;
- if (maxNumRecords != Long.MAX_VALUE) {
- transform = unbounded.withMaxNumRecords(maxNumRecords);
- } else if (maxReadTime != null) {
- transform = unbounded.withMaxReadTime(maxReadTime);
+ if (getMaxNumRecords() != Long.MAX_VALUE) {
+ transform = unbounded.withMaxNumRecords(getMaxNumRecords());
+ } else if (getMaxReadTime() != null) {
+ transform = unbounded.withMaxReadTime(getMaxReadTime());
}
return input.getPipeline().apply(transform);
@@ -152,65 +264,29 @@ public class JmsIO {
@Override
public void validate(PBegin input) {
- checkNotNull(connectionFactory, "ConnectionFactory not specified");
- checkArgument((queue != null || topic != null), "Either queue or topic not specified");
+ checkNotNull(getConnectionFactory(), "ConnectionFactory not specified");
+ checkArgument((getQueue() != null || getTopic() != null), "Either queue or topic not "
+ + "specified");
}
@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
-
- builder.addIfNotNull(DisplayData.item("queue", queue));
- builder.addIfNotNull(DisplayData.item("topic", topic));
+ builder.addIfNotNull(DisplayData.item("queue", getQueue()));
+ builder.addIfNotNull(DisplayData.item("topic", getTopic()));
}
///////////////////////////////////////////////////////////////////////////////////////
/**
- * NB: According to http://docs.oracle.com/javaee/1.4/api/javax/jms/ConnectionFactory.html
- * "It is expected that JMS providers will provide the tools an administrator needs to create
- * and configure administered objects in a JNDI namespace. JMS provider implementations of
- * administered objects should be both javax.jndi.Referenceable and java.io.Serializable so
- * that they can be stored in all JNDI naming contexts. In addition, it is recommended that
- * these implementations follow the JavaBeansTM design patterns."
- *
- * <p>So, a {@link ConnectionFactory} implementation is serializable.
- */
- protected ConnectionFactory connectionFactory;
- @Nullable
- protected String queue;
- @Nullable
- protected String topic;
- protected long maxNumRecords;
- protected Duration maxReadTime;
-
- private Read(
- ConnectionFactory connectionFactory,
- String queue,
- String topic,
- long maxNumRecords,
- Duration maxReadTime) {
- super("JmsIO.Read");
-
- this.connectionFactory = connectionFactory;
- this.queue = queue;
- this.topic = topic;
- this.maxNumRecords = maxNumRecords;
- this.maxReadTime = maxReadTime;
- }
-
- /**
* Creates an {@link UnboundedSource UnboundedSource<JmsRecord, ?>} with the configuration
* in {@link Read}. Primary use case is unit tests, should not be used in an
* application.
*/
@VisibleForTesting
UnboundedSource<JmsRecord, JmsCheckpointMark> createSource() {
- return new UnboundedJmsSource(
- connectionFactory,
- queue,
- topic);
+ return new UnboundedJmsSource(this);
}
}
@@ -219,17 +295,10 @@ public class JmsIO {
private static class UnboundedJmsSource extends UnboundedSource<JmsRecord, JmsCheckpointMark> {
- private final ConnectionFactory connectionFactory;
- private final String queue;
- private final String topic;
+ private final Read spec;
- public UnboundedJmsSource(
- ConnectionFactory connectionFactory,
- String queue,
- String topic) {
- this.connectionFactory = connectionFactory;
- this.queue = queue;
- this.topic = topic;
+ public UnboundedJmsSource(Read spec) {
+ this.spec = spec;
}
@Override
@@ -237,7 +306,7 @@ public class JmsIO {
int desiredNumSplits, PipelineOptions options) throws Exception {
List<UnboundedJmsSource> sources = new ArrayList<>();
for (int i = 0; i < desiredNumSplits; i++) {
- sources.add(new UnboundedJmsSource(connectionFactory, queue, topic));
+ sources.add(new UnboundedJmsSource(spec));
}
return sources;
}
@@ -250,8 +319,7 @@ public class JmsIO {
@Override
public void validate() {
- checkNotNull(connectionFactory, "ConnectionFactory is not defined");
- checkArgument((queue != null || topic != null), "Either queue or topic is not defined");
+ spec.validate(null);
}
@Override
@@ -291,15 +359,17 @@ public class JmsIO {
@Override
public boolean start() throws IOException {
- ConnectionFactory connectionFactory = source.connectionFactory;
+ ConnectionFactory connectionFactory = source.spec.getConnectionFactory();
try {
this.connection = connectionFactory.createConnection();
this.connection.start();
this.session = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- if (source.topic != null) {
- this.consumer = this.session.createConsumer(this.session.createTopic(source.topic));
+ if (source.spec.getTopic() != null) {
+ this.consumer =
+ this.session.createConsumer(this.session.createTopic(source.spec.getTopic()));
} else {
- this.consumer = this.session.createConsumer(this.session.createQueue(source.queue));
+ this.consumer =
+ this.session.createConsumer(this.session.createQueue(source.spec.getQueue()));
}
return advance();
@@ -409,70 +479,122 @@ public class JmsIO {
* A {@link PTransform} to write to a JMS queue. See {@link JmsIO} for
* more information on usage and configuration.
*/
- public static class Write extends PTransform<PCollection<String>, PDone> {
+ @AutoValue
+ public abstract static class Write extends PTransform<PCollection<String>, PDone> {
- protected ConnectionFactory connectionFactory;
- protected String queue;
- protected String topic;
+ @Nullable abstract ConnectionFactory getConnectionFactory();
+ @Nullable abstract String getQueue();
+ @Nullable abstract String getTopic();
+ abstract Builder builder();
+
+ @AutoValue.Builder
+ abstract static class Builder {
+ abstract Builder setConnectionFactory(ConnectionFactory connectionFactory);
+ abstract Builder setQueue(String queue);
+ abstract Builder setTopic(String topic);
+ abstract Write build();
+ }
+
+ /**
+ * <p>Specify the JMS connection factory to connect to the JMS broker.
+ *
+ * <p>For instance:
+ *
+ * <pre>
+ * {@code
+ * .apply(JmsIO.write().withConnectionFactory(myConnectionFactory)
+ * }
+ * </pre>
+ *
+ * @param connectionFactory The JMS {@link ConnectionFactory}.
+ * @return The corresponding {@link JmsIO.Read}.
+ */
public Write withConnectionFactory(ConnectionFactory connectionFactory) {
- return new Write(connectionFactory, queue, topic);
+ return builder().setConnectionFactory(connectionFactory).build();
}
+ /**
+ * <p>Specify the JMS queue destination name where to send messages to. The
+ * {@link JmsIO.Write} acts as a producer on the queue.
+ *
+ * <p>This method is exclusive with {@link JmsIO.Write#withTopic(String)}. The user has to
+ * specify a destination: queue or topic.
+ *
+ * <p>For instance:
+ *
+ * <pre>
+ * {@code
+ * .apply(JmsIO.write().withQueue("my-queue")
+ * }
+ * </pre>
+ *
+ * @param queue The JMS queue name where to send messages to.
+ * @return The corresponding {@link JmsIO.Read}.
+ */
public Write withQueue(String queue) {
- return new Write(connectionFactory, queue, topic);
+ return builder().setQueue(queue).build();
}
+ /**
+ * <p>Specify the JMS topic destination name where to send messages to. The
+ * {@link JmsIO.Read} acts as a publisher on the topic.
+ *
+ * <p>This method is exclusive with {@link JmsIO.Write#withQueue(String)}. The user has to
+ * specify a destination: queue or topic.
+ *
+ * <p>For instance:
+ *
+ * <pre>
+ * {@code
+ * .apply(JmsIO.write().withTopic("my-topic")
+ * }
+ * </pre>
+ *
+ * @param topic The JMS topic name.
+ * @return The corresponding {@link JmsIO.Read}.
+ */
public Write withTopic(String topic) {
- return new Write(connectionFactory, queue, topic);
- }
-
- private Write(ConnectionFactory connectionFactory, String queue, String topic) {
- this.connectionFactory = connectionFactory;
- this.queue = queue;
- this.topic = topic;
+ return builder().setTopic(topic).build();
}
@Override
public PDone expand(PCollection<String> input) {
- input.apply(ParDo.of(new JmsWriter(connectionFactory, queue, topic)));
+ input.apply(ParDo.of(new WriterFn(this)));
return PDone.in(input.getPipeline());
}
@Override
public void validate(PCollection<String> input) {
- checkNotNull(connectionFactory, "ConnectionFactory is not defined");
- checkArgument((queue != null || topic != null), "Either queue or topic is required");
+ checkNotNull(getConnectionFactory(), "ConnectionFactory is not defined");
+ checkArgument((getQueue() != null || getTopic() != null), "Either queue or topic is "
+ + "required");
}
- private static class JmsWriter extends DoFn<String, Void> {
+ private static class WriterFn extends DoFn<String, Void> {
- private ConnectionFactory connectionFactory;
- private String queue;
- private String topic;
+ private Write spec;
private Connection connection;
private Session session;
private MessageProducer producer;
- public JmsWriter(ConnectionFactory connectionFactory, String queue, String topic) {
- this.connectionFactory = connectionFactory;
- this.queue = queue;
- this.topic = topic;
+ public WriterFn(Write spec) {
+ this.spec = spec;
}
@StartBundle
public void startBundle(Context c) throws Exception {
if (producer == null) {
- this.connection = connectionFactory.createConnection();
+ this.connection = spec.getConnectionFactory().createConnection();
this.connection.start();
// false means we don't use JMS transaction.
this.session = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination;
- if (queue != null) {
- destination = session.createQueue(queue);
+ if (spec.getQueue() != null) {
+ destination = session.createQueue(spec.getQueue());
} else {
- destination = session.createTopic(topic);
+ destination = session.createTopic(spec.getTopic());
}
this.producer = this.session.createProducer(destination);
}
@@ -481,7 +603,6 @@ public class JmsIO {
@ProcessElement
public void processElement(ProcessContext ctx) throws Exception {
String value = ctx.element();
-
try {
TextMessage message = session.createTextMessage(value);
producer.send(message);