You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2017/05/31 19:27:49 UTC

[1/2] beam git commit: [BEAM-2246] Use CLIENT_ACK instead of AUTO_ACK in JmsIO

Repository: beam
Updated Branches:
  refs/heads/master 4884d4867 -> 2df9dbd24


[BEAM-2246] Use CLIENT_ACK instead of AUTO_ACK in JmsIO


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a158fc17
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a158fc17
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a158fc17

Branch: refs/heads/master
Commit: a158fc178e1297f04f4f18975383ec1dc69bc0d8
Parents: 4884d48
Author: Jean-Baptiste Onofré <jb...@apache.org>
Authored: Wed May 10 07:39:56 2017 +0200
Committer: Jean-Baptiste Onofré <jb...@apache.org>
Committed: Wed May 31 21:24:22 2017 +0200

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/jms/JmsIO.java  |  5 +-
 .../org/apache/beam/sdk/io/jms/JmsIOTest.java   | 78 ++++++++++++++++++++
 2 files changed, 81 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/a158fc17/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
index b8355ad..c5e5150 100644
--- a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
+++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
@@ -379,7 +379,8 @@ public class JmsIO {
 
   }
 
-  private static class UnboundedJmsReader extends UnboundedReader<JmsRecord> {
+  @VisibleForTesting
+  static class UnboundedJmsReader extends UnboundedReader<JmsRecord> {
 
     private UnboundedJmsSource source;
     private JmsCheckpointMark checkpointMark;
@@ -421,7 +422,7 @@ public class JmsIO {
       }
 
       try {
-        this.session = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        this.session = this.connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
       } catch (Exception e) {
         throw new IOException("Error creating JMS session", e);
       }

http://git-wip-us.apache.org/repos/asf/beam/blob/a158fc17/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java
index 7edda1a..43c050e 100644
--- a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java
+++ b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java
@@ -23,10 +23,12 @@ import static org.hamcrest.Matchers.instanceOf;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Enumeration;
 import java.util.List;
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
@@ -34,6 +36,7 @@ import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
+import javax.jms.QueueBrowser;
 import javax.jms.Session;
 import javax.jms.TextMessage;
 import org.apache.activemq.ActiveMQConnectionFactory;
@@ -71,6 +74,7 @@ public class JmsIOTest {
 
   private BrokerService broker;
   private ConnectionFactory connectionFactory;
+  private ConnectionFactory connectionFactoryWithoutPrefetch;
 
   @Rule
   public final transient TestPipeline pipeline = TestPipeline.create();
@@ -98,6 +102,8 @@ public class JmsIOTest {
 
     // create JMS connection factory
     connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
+    connectionFactoryWithoutPrefetch =
+        new ActiveMQConnectionFactory(BROKER_URL + "?jms.prefetchPolicy.all=0");
   }
 
   @After
@@ -236,4 +242,76 @@ public class JmsIOTest {
     assertEquals(1, splits.size());
   }
 
+  @Test
+  public void testCheckpointMark() throws Exception {
+    // we are using no prefetch here
+    // prefetch is an ActiveMQ feature: to make efficient use of network resources the broker
+    // utilizes a 'push' model to dispatch messages to consumers. However, in the case of our
+    // test, it means that we can have some latency between the receiveNoWait() method used by
+    // the consumer and the prefetch buffer populated by the broker. Using a prefetch to 0 means
+    // that the consumer will poll for message, which is exactly what we want for the test.
+    Connection connection = connectionFactoryWithoutPrefetch.createConnection(USERNAME, PASSWORD);
+    connection.start();
+    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+    MessageProducer producer = session.createProducer(session.createQueue(QUEUE));
+    for (int i = 0; i < 10; i++) {
+      producer.send(session.createTextMessage("test " + i));
+    }
+    producer.close();
+    session.close();
+    connection.close();
+
+    JmsIO.Read spec = JmsIO.read()
+        .withConnectionFactory(connectionFactoryWithoutPrefetch)
+        .withUsername(USERNAME)
+        .withPassword(PASSWORD)
+        .withQueue(QUEUE);
+    JmsIO.UnboundedJmsSource source = new JmsIO.UnboundedJmsSource(spec);
+    JmsIO.UnboundedJmsReader reader = source.createReader(null, null);
+
+    // start the reader and move to the first record
+    assertTrue(reader.start());
+
+    // consume 3 messages (NB: start already consumed the first message)
+    for (int i = 0; i < 3; i++) {
+      assertTrue(reader.advance());
+    }
+
+    // the messages are still pending in the queue (no ACK yet)
+    assertEquals(10, count(QUEUE));
+
+    // we finalize the checkpoint
+    reader.getCheckpointMark().finalizeCheckpoint();
+
+    // the checkpoint finalize ack the messages, and so they are not pending in the queue anymore
+    assertEquals(6, count(QUEUE));
+
+    // we read the 6 pending messages
+    for (int i = 0; i < 6; i++) {
+      assertTrue(reader.advance());
+    }
+
+    // still 6 pending messages as we didn't finalize the checkpoint
+    assertEquals(6, count(QUEUE));
+
+    // we finalize the checkpoint: no more message in the queue
+    reader.getCheckpointMark().finalizeCheckpoint();
+
+    assertEquals(0, count(QUEUE));
+  }
+
+  private int count(String queue) throws Exception {
+    Connection connection = connectionFactory.createConnection(USERNAME, PASSWORD);
+    connection.start();
+    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+    QueueBrowser browser = session.createBrowser(session.createQueue(queue));
+    Enumeration<Message> messages = browser.getEnumeration();
+    int count = 0;
+    while (messages.hasMoreElements()) {
+      messages.nextElement();
+      count++;
+    }
+    return count;
+  }
+
 }


[2/2] beam git commit: [BEAM-2246] This closes #3073

Posted by jb...@apache.org.
[BEAM-2246] This closes #3073


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2df9dbd2
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2df9dbd2
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2df9dbd2

Branch: refs/heads/master
Commit: 2df9dbd24f6fd3bcfb14316859f4b38c38d95eca
Parents: 4884d48 a158fc1
Author: Jean-Baptiste Onofré <jb...@apache.org>
Authored: Wed May 31 21:27:34 2017 +0200
Committer: Jean-Baptiste Onofré <jb...@apache.org>
Committed: Wed May 31 21:27:34 2017 +0200

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/jms/JmsIO.java  |  5 +-
 .../org/apache/beam/sdk/io/jms/JmsIOTest.java   | 78 ++++++++++++++++++++
 2 files changed, 81 insertions(+), 2 deletions(-)
----------------------------------------------------------------------