You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2017/05/31 19:27:49 UTC
[1/2] beam git commit: [BEAM-2246] Use CLIENT_ACK instead of AUTO_ACK
in JmsIO
Repository: beam
Updated Branches:
refs/heads/master 4884d4867 -> 2df9dbd24
[BEAM-2246] Use CLIENT_ACK instead of AUTO_ACK in JmsIO
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a158fc17
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a158fc17
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a158fc17
Branch: refs/heads/master
Commit: a158fc178e1297f04f4f18975383ec1dc69bc0d8
Parents: 4884d48
Author: Jean-Baptiste Onofré <jb...@apache.org>
Authored: Wed May 10 07:39:56 2017 +0200
Committer: Jean-Baptiste Onofré <jb...@apache.org>
Committed: Wed May 31 21:24:22 2017 +0200
----------------------------------------------------------------------
.../java/org/apache/beam/sdk/io/jms/JmsIO.java | 5 +-
.../org/apache/beam/sdk/io/jms/JmsIOTest.java | 78 ++++++++++++++++++++
2 files changed, 81 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/a158fc17/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
index b8355ad..c5e5150 100644
--- a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
+++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
@@ -379,7 +379,8 @@ public class JmsIO {
}
- private static class UnboundedJmsReader extends UnboundedReader<JmsRecord> {
+ @VisibleForTesting
+ static class UnboundedJmsReader extends UnboundedReader<JmsRecord> {
private UnboundedJmsSource source;
private JmsCheckpointMark checkpointMark;
@@ -421,7 +422,7 @@ public class JmsIO {
}
try {
- this.session = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ this.session = this.connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
} catch (Exception e) {
throw new IOException("Error creating JMS session", e);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/a158fc17/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java
index 7edda1a..43c050e 100644
--- a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java
+++ b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java
@@ -23,10 +23,12 @@ import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Enumeration;
import java.util.List;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
@@ -34,6 +36,7 @@ import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
+import javax.jms.QueueBrowser;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
@@ -71,6 +74,7 @@ public class JmsIOTest {
private BrokerService broker;
private ConnectionFactory connectionFactory;
+ private ConnectionFactory connectionFactoryWithoutPrefetch;
@Rule
public final transient TestPipeline pipeline = TestPipeline.create();
@@ -98,6 +102,8 @@ public class JmsIOTest {
// create JMS connection factory
connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
+ connectionFactoryWithoutPrefetch =
+ new ActiveMQConnectionFactory(BROKER_URL + "?jms.prefetchPolicy.all=0");
}
@After
@@ -236,4 +242,76 @@ public class JmsIOTest {
assertEquals(1, splits.size());
}
+ @Test
+ public void testCheckpointMark() throws Exception {
+ // we are using no prefetch here
+ // prefetch is an ActiveMQ feature: to make efficient use of network resources the broker
+ // utilizes a 'push' model to dispatch messages to consumers. However, in the case of our
+ // test, it means that we can have some latency between the receiveNoWait() method used by
+ // the consumer and the prefetch buffer populated by the broker. Using a prefetch to 0 means
+ // that the consumer will poll for message, which is exactly what we want for the test.
+ Connection connection = connectionFactoryWithoutPrefetch.createConnection(USERNAME, PASSWORD);
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = session.createProducer(session.createQueue(QUEUE));
+ for (int i = 0; i < 10; i++) {
+ producer.send(session.createTextMessage("test " + i));
+ }
+ producer.close();
+ session.close();
+ connection.close();
+
+ JmsIO.Read spec = JmsIO.read()
+ .withConnectionFactory(connectionFactoryWithoutPrefetch)
+ .withUsername(USERNAME)
+ .withPassword(PASSWORD)
+ .withQueue(QUEUE);
+ JmsIO.UnboundedJmsSource source = new JmsIO.UnboundedJmsSource(spec);
+ JmsIO.UnboundedJmsReader reader = source.createReader(null, null);
+
+ // start the reader and move to the first record
+ assertTrue(reader.start());
+
+ // consume 3 messages (NB: start already consumed the first message)
+ for (int i = 0; i < 3; i++) {
+ assertTrue(reader.advance());
+ }
+
+ // the messages are still pending in the queue (no ACK yet)
+ assertEquals(10, count(QUEUE));
+
+ // we finalize the checkpoint
+ reader.getCheckpointMark().finalizeCheckpoint();
+
+ // the checkpoint finalize ack the messages, and so they are not pending in the queue anymore
+ assertEquals(6, count(QUEUE));
+
+ // we read the 6 pending messages
+ for (int i = 0; i < 6; i++) {
+ assertTrue(reader.advance());
+ }
+
+ // still 6 pending messages as we didn't finalize the checkpoint
+ assertEquals(6, count(QUEUE));
+
+ // we finalize the checkpoint: no more message in the queue
+ reader.getCheckpointMark().finalizeCheckpoint();
+
+ assertEquals(0, count(QUEUE));
+ }
+
+ private int count(String queue) throws Exception {
+ Connection connection = connectionFactory.createConnection(USERNAME, PASSWORD);
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ QueueBrowser browser = session.createBrowser(session.createQueue(queue));
+ Enumeration<Message> messages = browser.getEnumeration();
+ int count = 0;
+ while (messages.hasMoreElements()) {
+ messages.nextElement();
+ count++;
+ }
+ return count;
+ }
+
}
[2/2] beam git commit: [BEAM-2246] This closes #3073
Posted by jb...@apache.org.
[BEAM-2246] This closes #3073
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2df9dbd2
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2df9dbd2
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2df9dbd2
Branch: refs/heads/master
Commit: 2df9dbd24f6fd3bcfb14316859f4b38c38d95eca
Parents: 4884d48 a158fc1
Author: Jean-Baptiste Onofré <jb...@apache.org>
Authored: Wed May 31 21:27:34 2017 +0200
Committer: Jean-Baptiste Onofré <jb...@apache.org>
Committed: Wed May 31 21:27:34 2017 +0200
----------------------------------------------------------------------
.../java/org/apache/beam/sdk/io/jms/JmsIO.java | 5 +-
.../org/apache/beam/sdk/io/jms/JmsIOTest.java | 78 ++++++++++++++++++++
2 files changed, 81 insertions(+), 2 deletions(-)
----------------------------------------------------------------------