You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2016/12/19 06:52:22 UTC
[1/3] incubator-beam git commit: [BEAM-716] Use AutoValue in JmsIO
Repository: incubator-beam
Updated Branches:
refs/heads/master 1c9bf8d66 -> 1e148cd7d
[BEAM-716] Use AutoValue in JmsIO
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/caf1c720
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/caf1c720
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/caf1c720
Branch: refs/heads/master
Commit: caf1c720f66de4d502f79b6c11c64b49c53329b0
Parents: 1c9bf8d
Author: Jean-Baptiste Onofr� <jb...@apache.org>
Authored: Sun Dec 11 07:43:41 2016 +0100
Committer: Jean-Baptiste Onofr� <jb...@apache.org>
Committed: Mon Dec 19 07:24:00 2016 +0100
----------------------------------------------------------------------
sdks/java/io/jms/pom.xml | 7 +
.../java/org/apache/beam/sdk/io/jms/JmsIO.java | 321 +++++++++++++------
2 files changed, 228 insertions(+), 100 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/caf1c720/sdks/java/io/jms/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/jms/pom.xml b/sdks/java/io/jms/pom.xml
index bca0152..b88254e 100644
--- a/sdks/java/io/jms/pom.xml
+++ b/sdks/java/io/jms/pom.xml
@@ -81,6 +81,13 @@
<artifactId>jsr305</artifactId>
</dependency>
+ <!-- compile dependencies -->
+ <dependency>
+ <groupId>com.google.auto.value</groupId>
+ <artifactId>auto-value</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
<!-- test dependencies -->
<dependency>
<groupId>org.apache.activemq</groupId>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/caf1c720/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
index 24fa67d..76dee67 100644
--- a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
+++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.io.jms;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
+import com.google.auto.value.AutoValue;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.util.ArrayList;
@@ -101,37 +102,148 @@ public class JmsIO {
private static final Logger LOG = LoggerFactory.getLogger(JmsIO.class);
public static Read read() {
- return new Read(null, null, null, Long.MAX_VALUE, null);
+ return new AutoValue_JmsIO_Read.Builder().setMaxNumRecords(Long.MAX_VALUE).build();
}
public static Write write() {
- return new Write(null, null, null);
+ return new AutoValue_JmsIO_Write.Builder().build();
}
/**
* A {@link PTransform} to read from a JMS destination. See {@link JmsIO} for more
* information on usage and configuration.
*/
- public static class Read extends PTransform<PBegin, PCollection<JmsRecord>> {
+ @AutoValue
+ public abstract static class Read extends PTransform<PBegin, PCollection<JmsRecord>> {
+ /**
+ * NB: According to http://docs.oracle.com/javaee/1.4/api/javax/jms/ConnectionFactory.html
+ * "It is expected that JMS providers will provide the tools an administrator needs to create
+ * and configure administered objects in a JNDI namespace. JMS provider implementations of
+ * administered objects should be both javax.jndi.Referenceable and java.io.Serializable so
+ * that they can be stored in all JNDI naming contexts. In addition, it is recommended that
+ * these implementations follow the JavaBeansTM design patterns."
+ *
+ * <p>So, a {@link ConnectionFactory} implementation is serializable.
+ */
+ @Nullable abstract ConnectionFactory getConnectionFactory();
+ @Nullable abstract String getQueue();
+ @Nullable abstract String getTopic();
+ abstract long getMaxNumRecords();
+ @Nullable abstract Duration getMaxReadTime();
+
+ abstract Builder builder();
+
+ @AutoValue.Builder
+ abstract static class Builder {
+ abstract Builder setConnectionFactory(ConnectionFactory connectionFactory);
+ abstract Builder setQueue(String queue);
+ abstract Builder setTopic(String topic);
+ abstract Builder setMaxNumRecords(long maxNumRecords);
+ abstract Builder setMaxReadTime(Duration maxReadTime);
+ abstract Read build();
+ }
+
+ /**
+ * <p>Specify the JMS connection factory to connect to the JMS broker.
+ *
+ * <p>For instance:
+ *
+ * <pre>
+ * {@code
+ * pipeline.apply(JmsIO.read().withConnectionFactory(myConnectionFactory)
+ * }
+ * </pre>
+ *
+ * @param connectionFactory The JMS {@link ConnectionFactory}.
+ * @return The corresponding {@link JmsIO.Read}.
+ */
public Read withConnectionFactory(ConnectionFactory connectionFactory) {
- return new Read(connectionFactory, queue, topic, maxNumRecords, maxReadTime);
+ return builder().setConnectionFactory(connectionFactory).build();
}
+ /**
+ * <p>Specify the JMS queue destination name where to read messages from. The
+ * {@link JmsIO.Read} acts as a consumer on the queue.
+ *
+ * <p>This method is exclusive with {@link JmsIO.Read#withTopic(String)}. The user has to
+ * specify a destination: queue or topic.
+ *
+ * <p>For instance:
+ *
+ * <pre>
+ * {@code
+ * pipeline.apply(JmsIO.read().withQueue("my-queue")
+ * }
+ * </pre>
+ *
+ * @param queue The JMS queue name where to read messages from.
+ * @return The corresponding {@link JmsIO.Read}.
+ */
public Read withQueue(String queue) {
- return new Read(connectionFactory, queue, topic, maxNumRecords, maxReadTime);
+ return builder().setQueue(queue).build();
}
+ /**
+ * <p>Specify the JMS topic destination name where to receive messages from. The
+ * {@link JmsIO.Read} acts as a subscriber on the topic.
+ *
+ * <p>This method is exclusive with {@link JmsIO.Read#withQueue(String)}. The user has to
+ * specify a destination: queue or topic.
+ *
+ * <p>For instance:
+ *
+ * <pre>
+ * {@code
+ * pipeline.apply(JmsIO.read().withTopic("my-topic")
+ * }
+ * </pre>
+ *
+ * @param topic The JMS topic name.
+ * @return The corresponding {@link JmsIO.Read}.
+ */
public Read withTopic(String topic) {
- return new Read(connectionFactory, queue, topic, maxNumRecords, maxReadTime);
+ return builder().setTopic(topic).build();
}
+ /**
+ * <p>Define the max number of records that the source will read. Using a max number of records
+ * different from {@code Long.MAX_VALUE} means the source will be {@code Bounded}, and will
+ * stop once the max number of records read is reached.
+ *
+ * <p>For instance:
+ *
+ * <pre>
+ * {@code
+ * pipeline.apply(JmsIO.read().withNumRecords(1000)
+ * }
+ * </pre>
+ *
+ * @param maxNumRecords The max number of records to read from the JMS destination.
+ * @return The corresponding {@link JmsIO.Read}.
+ */
public Read withMaxNumRecords(long maxNumRecords) {
- return new Read(connectionFactory, queue, topic, maxNumRecords, maxReadTime);
+ return builder().setMaxNumRecords(maxNumRecords).build();
}
+ /**
+ * <p>Define the max read time that the source will read. Using a non null max read time
+ * duration means the source will be {@code Bounded}, and will stop once the max read time is
+ * reached.
+ *
+ * <p>For instance:
+ *
+ * <pre>
+ * {@code
+ * pipeline.apply(JmsIO.read().withMaxReadTime(Duration.minutes(10))
+ * }
+ * </pre>
+ *
+ * @param maxReadTime The max read time duration.
+ * @return The corresponding {@link JmsIO.Read}.
+ */
public Read withMaxReadTime(Duration maxReadTime) {
- return new Read(connectionFactory, queue, topic, maxNumRecords, maxReadTime);
+ return builder().setMaxReadTime(maxReadTime).build();
}
@Override
@@ -141,10 +253,10 @@ public class JmsIO {
PTransform<PBegin, PCollection<JmsRecord>> transform = unbounded;
- if (maxNumRecords != Long.MAX_VALUE) {
- transform = unbounded.withMaxNumRecords(maxNumRecords);
- } else if (maxReadTime != null) {
- transform = unbounded.withMaxReadTime(maxReadTime);
+ if (getMaxNumRecords() != Long.MAX_VALUE) {
+ transform = unbounded.withMaxNumRecords(getMaxNumRecords());
+ } else if (getMaxReadTime() != null) {
+ transform = unbounded.withMaxReadTime(getMaxReadTime());
}
return input.getPipeline().apply(transform);
@@ -152,65 +264,29 @@ public class JmsIO {
@Override
public void validate(PBegin input) {
- checkNotNull(connectionFactory, "ConnectionFactory not specified");
- checkArgument((queue != null || topic != null), "Either queue or topic not specified");
+ checkNotNull(getConnectionFactory(), "ConnectionFactory not specified");
+ checkArgument((getQueue() != null || getTopic() != null), "Either queue or topic not "
+ + "specified");
}
@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
-
- builder.addIfNotNull(DisplayData.item("queue", queue));
- builder.addIfNotNull(DisplayData.item("topic", topic));
+ builder.addIfNotNull(DisplayData.item("queue", getQueue()));
+ builder.addIfNotNull(DisplayData.item("topic", getTopic()));
}
///////////////////////////////////////////////////////////////////////////////////////
/**
- * NB: According to http://docs.oracle.com/javaee/1.4/api/javax/jms/ConnectionFactory.html
- * "It is expected that JMS providers will provide the tools an administrator needs to create
- * and configure administered objects in a JNDI namespace. JMS provider implementations of
- * administered objects should be both javax.jndi.Referenceable and java.io.Serializable so
- * that they can be stored in all JNDI naming contexts. In addition, it is recommended that
- * these implementations follow the JavaBeansTM design patterns."
- *
- * <p>So, a {@link ConnectionFactory} implementation is serializable.
- */
- protected ConnectionFactory connectionFactory;
- @Nullable
- protected String queue;
- @Nullable
- protected String topic;
- protected long maxNumRecords;
- protected Duration maxReadTime;
-
- private Read(
- ConnectionFactory connectionFactory,
- String queue,
- String topic,
- long maxNumRecords,
- Duration maxReadTime) {
- super("JmsIO.Read");
-
- this.connectionFactory = connectionFactory;
- this.queue = queue;
- this.topic = topic;
- this.maxNumRecords = maxNumRecords;
- this.maxReadTime = maxReadTime;
- }
-
- /**
* Creates an {@link UnboundedSource UnboundedSource<JmsRecord, ?>} with the configuration
* in {@link Read}. Primary use case is unit tests, should not be used in an
* application.
*/
@VisibleForTesting
UnboundedSource<JmsRecord, JmsCheckpointMark> createSource() {
- return new UnboundedJmsSource(
- connectionFactory,
- queue,
- topic);
+ return new UnboundedJmsSource(this);
}
}
@@ -219,17 +295,10 @@ public class JmsIO {
private static class UnboundedJmsSource extends UnboundedSource<JmsRecord, JmsCheckpointMark> {
- private final ConnectionFactory connectionFactory;
- private final String queue;
- private final String topic;
+ private final Read spec;
- public UnboundedJmsSource(
- ConnectionFactory connectionFactory,
- String queue,
- String topic) {
- this.connectionFactory = connectionFactory;
- this.queue = queue;
- this.topic = topic;
+ public UnboundedJmsSource(Read spec) {
+ this.spec = spec;
}
@Override
@@ -237,7 +306,7 @@ public class JmsIO {
int desiredNumSplits, PipelineOptions options) throws Exception {
List<UnboundedJmsSource> sources = new ArrayList<>();
for (int i = 0; i < desiredNumSplits; i++) {
- sources.add(new UnboundedJmsSource(connectionFactory, queue, topic));
+ sources.add(new UnboundedJmsSource(spec));
}
return sources;
}
@@ -250,8 +319,7 @@ public class JmsIO {
@Override
public void validate() {
- checkNotNull(connectionFactory, "ConnectionFactory is not defined");
- checkArgument((queue != null || topic != null), "Either queue or topic is not defined");
+ spec.validate(null);
}
@Override
@@ -291,15 +359,17 @@ public class JmsIO {
@Override
public boolean start() throws IOException {
- ConnectionFactory connectionFactory = source.connectionFactory;
+ ConnectionFactory connectionFactory = source.spec.getConnectionFactory();
try {
this.connection = connectionFactory.createConnection();
this.connection.start();
this.session = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- if (source.topic != null) {
- this.consumer = this.session.createConsumer(this.session.createTopic(source.topic));
+ if (source.spec.getTopic() != null) {
+ this.consumer =
+ this.session.createConsumer(this.session.createTopic(source.spec.getTopic()));
} else {
- this.consumer = this.session.createConsumer(this.session.createQueue(source.queue));
+ this.consumer =
+ this.session.createConsumer(this.session.createQueue(source.spec.getQueue()));
}
return advance();
@@ -409,70 +479,122 @@ public class JmsIO {
* A {@link PTransform} to write to a JMS queue. See {@link JmsIO} for
* more information on usage and configuration.
*/
- public static class Write extends PTransform<PCollection<String>, PDone> {
+ @AutoValue
+ public abstract static class Write extends PTransform<PCollection<String>, PDone> {
- protected ConnectionFactory connectionFactory;
- protected String queue;
- protected String topic;
+ @Nullable abstract ConnectionFactory getConnectionFactory();
+ @Nullable abstract String getQueue();
+ @Nullable abstract String getTopic();
+ abstract Builder builder();
+
+ @AutoValue.Builder
+ abstract static class Builder {
+ abstract Builder setConnectionFactory(ConnectionFactory connectionFactory);
+ abstract Builder setQueue(String queue);
+ abstract Builder setTopic(String topic);
+ abstract Write build();
+ }
+
+ /**
+ * <p>Specify the JMS connection factory to connect to the JMS broker.
+ *
+ * <p>For instance:
+ *
+ * <pre>
+ * {@code
+ * .apply(JmsIO.write().withConnectionFactory(myConnectionFactory)
+ * }
+ * </pre>
+ *
+ * @param connectionFactory The JMS {@link ConnectionFactory}.
+ * @return The corresponding {@link JmsIO.Read}.
+ */
public Write withConnectionFactory(ConnectionFactory connectionFactory) {
- return new Write(connectionFactory, queue, topic);
+ return builder().setConnectionFactory(connectionFactory).build();
}
+ /**
+ * <p>Specify the JMS queue destination name where to send messages to. The
+ * {@link JmsIO.Write} acts as a producer on the queue.
+ *
+ * <p>This method is exclusive with {@link JmsIO.Write#withTopic(String)}. The user has to
+ * specify a destination: queue or topic.
+ *
+ * <p>For instance:
+ *
+ * <pre>
+ * {@code
+ * .apply(JmsIO.write().withQueue("my-queue")
+ * }
+ * </pre>
+ *
+ * @param queue The JMS queue name where to send messages to.
+ * @return The corresponding {@link JmsIO.Read}.
+ */
public Write withQueue(String queue) {
- return new Write(connectionFactory, queue, topic);
+ return builder().setQueue(queue).build();
}
+ /**
+ * <p>Specify the JMS topic destination name where to send messages to. The
+ * {@link JmsIO.Read} acts as a publisher on the topic.
+ *
+ * <p>This method is exclusive with {@link JmsIO.Write#withQueue(String)}. The user has to
+ * specify a destination: queue or topic.
+ *
+ * <p>For instance:
+ *
+ * <pre>
+ * {@code
+ * .apply(JmsIO.write().withTopic("my-topic")
+ * }
+ * </pre>
+ *
+ * @param topic The JMS topic name.
+ * @return The corresponding {@link JmsIO.Read}.
+ */
public Write withTopic(String topic) {
- return new Write(connectionFactory, queue, topic);
- }
-
- private Write(ConnectionFactory connectionFactory, String queue, String topic) {
- this.connectionFactory = connectionFactory;
- this.queue = queue;
- this.topic = topic;
+ return builder().setTopic(topic).build();
}
@Override
public PDone expand(PCollection<String> input) {
- input.apply(ParDo.of(new JmsWriter(connectionFactory, queue, topic)));
+ input.apply(ParDo.of(new WriterFn(this)));
return PDone.in(input.getPipeline());
}
@Override
public void validate(PCollection<String> input) {
- checkNotNull(connectionFactory, "ConnectionFactory is not defined");
- checkArgument((queue != null || topic != null), "Either queue or topic is required");
+ checkNotNull(getConnectionFactory(), "ConnectionFactory is not defined");
+ checkArgument((getQueue() != null || getTopic() != null), "Either queue or topic is "
+ + "required");
}
- private static class JmsWriter extends DoFn<String, Void> {
+ private static class WriterFn extends DoFn<String, Void> {
- private ConnectionFactory connectionFactory;
- private String queue;
- private String topic;
+ private Write spec;
private Connection connection;
private Session session;
private MessageProducer producer;
- public JmsWriter(ConnectionFactory connectionFactory, String queue, String topic) {
- this.connectionFactory = connectionFactory;
- this.queue = queue;
- this.topic = topic;
+ public WriterFn(Write spec) {
+ this.spec = spec;
}
@StartBundle
public void startBundle(Context c) throws Exception {
if (producer == null) {
- this.connection = connectionFactory.createConnection();
+ this.connection = spec.getConnectionFactory().createConnection();
this.connection.start();
// false means we don't use JMS transaction.
this.session = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination;
- if (queue != null) {
- destination = session.createQueue(queue);
+ if (spec.getQueue() != null) {
+ destination = session.createQueue(spec.getQueue());
} else {
- destination = session.createTopic(topic);
+ destination = session.createTopic(spec.getTopic());
}
this.producer = this.session.createProducer(destination);
}
@@ -481,7 +603,6 @@ public class JmsIO {
@ProcessElement
public void processElement(ProcessContext ctx) throws Exception {
String value = ctx.element();
-
try {
TextMessage message = session.createTextMessage(value);
producer.send(message);
[3/3] incubator-beam git commit: [BEAM-716] This closes #1577
Posted by jb...@apache.org.
[BEAM-716] This closes #1577
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1e148cd7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1e148cd7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1e148cd7
Branch: refs/heads/master
Commit: 1e148cd7d5f12e6742ac57440bf0731460d11b80
Parents: 1c9bf8d 30e14cf
Author: Jean-Baptiste Onofr� <jb...@apache.org>
Authored: Mon Dec 19 07:40:39 2016 +0100
Committer: Jean-Baptiste Onofr� <jb...@apache.org>
Committed: Mon Dec 19 07:40:39 2016 +0100
----------------------------------------------------------------------
sdks/java/io/jms/pom.xml | 7 +
.../java/org/apache/beam/sdk/io/jms/JmsIO.java | 338 +++++++++++++------
2 files changed, 244 insertions(+), 101 deletions(-)
----------------------------------------------------------------------
[2/3] incubator-beam git commit: [BEAM-716] Fix javadoc on with*
methods [BEAM-959] Improve check preconditions in JmsIO
Posted by jb...@apache.org.
[BEAM-716] Fix javadoc on with* methods
[BEAM-959] Improve check preconditions in JmsIO
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/30e14cfa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/30e14cfa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/30e14cfa
Branch: refs/heads/master
Commit: 30e14cfa63db50d567185599ea049c96229b48e2
Parents: caf1c72
Author: Jean-Baptiste Onofr� <jb...@apache.org>
Authored: Tue Dec 13 21:55:46 2016 +0100
Committer: Jean-Baptiste Onofr� <jb...@apache.org>
Committed: Mon Dec 19 07:24:05 2016 +0100
----------------------------------------------------------------------
.../java/org/apache/beam/sdk/io/jms/JmsIO.java | 45 +++++++++++++-------
1 file changed, 30 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/30e14cfa/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
index 76dee67..b6de26a 100644
--- a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
+++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
@@ -18,7 +18,7 @@
package org.apache.beam.sdk.io.jms;
import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
import com.google.auto.value.AutoValue;
import com.google.common.annotations.VisibleForTesting;
@@ -145,7 +145,7 @@ public class JmsIO {
}
/**
- * <p>Specify the JMS connection factory to connect to the JMS broker.
+ * Specify the JMS connection factory to connect to the JMS broker.
*
* <p>For instance:
*
@@ -159,11 +159,13 @@ public class JmsIO {
* @return The corresponding {@link JmsIO.Read}.
*/
public Read withConnectionFactory(ConnectionFactory connectionFactory) {
+ checkArgument(connectionFactory != null, "withConnectionFactory(connectionFactory) called"
+ + " with null connectionFactory");
return builder().setConnectionFactory(connectionFactory).build();
}
/**
- * <p>Specify the JMS queue destination name where to read messages from. The
+ * Specify the JMS queue destination name where to read messages from. The
* {@link JmsIO.Read} acts as a consumer on the queue.
*
* <p>This method is exclusive with {@link JmsIO.Read#withTopic(String)}. The user has to
@@ -181,11 +183,12 @@ public class JmsIO {
* @return The corresponding {@link JmsIO.Read}.
*/
public Read withQueue(String queue) {
+ checkArgument(queue != null, "withQueue(queue) called with null queue");
return builder().setQueue(queue).build();
}
/**
- * <p>Specify the JMS topic destination name where to receive messages from. The
+ * Specify the JMS topic destination name where to receive messages from. The
* {@link JmsIO.Read} acts as a subscriber on the topic.
*
* <p>This method is exclusive with {@link JmsIO.Read#withQueue(String)}. The user has to
@@ -203,11 +206,12 @@ public class JmsIO {
* @return The corresponding {@link JmsIO.Read}.
*/
public Read withTopic(String topic) {
+ checkArgument(topic != null, "withTopic(topic) called with null topic");
return builder().setTopic(topic).build();
}
/**
- * <p>Define the max number of records that the source will read. Using a max number of records
+ * Define the max number of records that the source will read. Using a max number of records
* different from {@code Long.MAX_VALUE} means the source will be {@code Bounded}, and will
* stop once the max number of records read is reached.
*
@@ -223,11 +227,13 @@ public class JmsIO {
* @return The corresponding {@link JmsIO.Read}.
*/
public Read withMaxNumRecords(long maxNumRecords) {
+ checkArgument(maxNumRecords >= 0, "withMaxNumRecords(maxNumRecords) called with invalid "
+ + "maxNumRecords");
return builder().setMaxNumRecords(maxNumRecords).build();
}
/**
- * <p>Define the max read time that the source will read. Using a non null max read time
+ * Define the max read time that the source will read. Using a non null max read time
* duration means the source will be {@code Bounded}, and will stop once the max read time is
* reached.
*
@@ -243,6 +249,8 @@ public class JmsIO {
* @return The corresponding {@link JmsIO.Read}.
*/
public Read withMaxReadTime(Duration maxReadTime) {
+ checkArgument(maxReadTime != null, "withMaxReadTime(maxReadTime) called with null "
+ + "maxReadTime");
return builder().setMaxReadTime(maxReadTime).build();
}
@@ -264,9 +272,11 @@ public class JmsIO {
@Override
public void validate(PBegin input) {
- checkNotNull(getConnectionFactory(), "ConnectionFactory not specified");
- checkArgument((getQueue() != null || getTopic() != null), "Either queue or topic not "
- + "specified");
+ checkState(getConnectionFactory() != null, "JmsIO.read() requires a JMS connection "
+ + "factory to be set via withConnectionFactory(connectionFactory)");
+ checkState((getQueue() != null || getTopic() != null), "JmsIO.read() requires a JMS "
+ + "destination (queue or topic) to be set via withQueue(queueName) or withTopic"
+ + "(topicName)");
}
@Override
@@ -497,7 +507,7 @@ public class JmsIO {
}
/**
- * <p>Specify the JMS connection factory to connect to the JMS broker.
+ * Specify the JMS connection factory to connect to the JMS broker.
*
* <p>For instance:
*
@@ -511,11 +521,13 @@ public class JmsIO {
* @return The corresponding {@link JmsIO.Read}.
*/
public Write withConnectionFactory(ConnectionFactory connectionFactory) {
+ checkArgument(connectionFactory != null, "withConnectionFactory(connectionFactory) called"
+ + " with null connectionFactory");
return builder().setConnectionFactory(connectionFactory).build();
}
/**
- * <p>Specify the JMS queue destination name where to send messages to. The
+ * Specify the JMS queue destination name where to send messages to. The
* {@link JmsIO.Write} acts as a producer on the queue.
*
* <p>This method is exclusive with {@link JmsIO.Write#withTopic(String)}. The user has to
@@ -533,11 +545,12 @@ public class JmsIO {
* @return The corresponding {@link JmsIO.Read}.
*/
public Write withQueue(String queue) {
+ checkArgument(queue != null, "withQueue(queue) called with null queue");
return builder().setQueue(queue).build();
}
/**
- * <p>Specify the JMS topic destination name where to send messages to. The
+ * Specify the JMS topic destination name where to send messages to. The
* {@link JmsIO.Read} acts as a publisher on the topic.
*
* <p>This method is exclusive with {@link JmsIO.Write#withQueue(String)}. The user has to
@@ -555,6 +568,7 @@ public class JmsIO {
* @return The corresponding {@link JmsIO.Read}.
*/
public Write withTopic(String topic) {
+ checkArgument(topic != null, "withTopic(topic) called with null topic");
return builder().setTopic(topic).build();
}
@@ -566,9 +580,10 @@ public class JmsIO {
@Override
public void validate(PCollection<String> input) {
- checkNotNull(getConnectionFactory(), "ConnectionFactory is not defined");
- checkArgument((getQueue() != null || getTopic() != null), "Either queue or topic is "
- + "required");
+ checkState(getConnectionFactory() != null, "JmsIO.write() requires a JMS connection "
+ + "factory to be set via withConnectionFactory(connectionFactory)");
+ checkState((getQueue() != null || getTopic() != null), "JmsIO.write() requires a JMS "
+ + "destination (queue or topic) to be set via withQueue(queue) or withTopic(topic)");
}
private static class WriterFn extends DoFn<String, Void> {