You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2017/08/11 05:59:39 UTC
[1/2] beam git commit: [BEAM-2684] Fix flaky AmqpIOTest by
introducing ActiveMQ AMQP broker instead of peer-to-peer mode
Repository: beam
Updated Branches:
refs/heads/master 9088a3e39 -> 07e8cd5fc
[BEAM-2684] Fix flaky AmqpIOTest by introducing ActiveMQ AMQP broker instead of peer-to-peer mode
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/02eb0913
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/02eb0913
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/02eb0913
Branch: refs/heads/master
Commit: 02eb09135ce1ae234052caf7ff2787256908f918
Parents: 9088a3e
Author: Alex Filatov <al...@users.noreply.github.com>
Authored: Thu Aug 10 23:02:37 2017 +0300
Committer: Jean-Baptiste Onofré <jb...@apache.org>
Committed: Fri Aug 11 07:20:06 2017 +0200
----------------------------------------------------------------------
sdks/java/io/amqp/pom.xml | 22 ++++
.../org/apache/beam/sdk/io/amqp/AmqpIOTest.java | 112 ++++++++-----------
2 files changed, 66 insertions(+), 68 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/02eb0913/sdks/java/io/amqp/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/amqp/pom.xml b/sdks/java/io/amqp/pom.xml
index 4369bb8..c28436b 100644
--- a/sdks/java/io/amqp/pom.xml
+++ b/sdks/java/io/amqp/pom.xml
@@ -30,6 +30,10 @@
<name>Apache Beam :: SDKs :: Java :: IO :: AMQP</name>
<description>IO to read and write using AMQP 1.0 protocol (http://www.amqp.org).</description>
+ <properties>
+ <activemq.version>5.13.1</activemq.version>
+ </properties>
+
<dependencies>
<dependency>
<groupId>org.apache.beam</groupId>
@@ -96,6 +100,24 @@
<artifactId>beam-runners-direct-java</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-broker</artifactId>
+ <version>${activemq.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-amqp</artifactId>
+ <version>${activemq.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.activemq.tooling</groupId>
+ <artifactId>activemq-junit</artifactId>
+ <version>${activemq.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/beam/blob/02eb0913/sdks/java/io/amqp/src/test/java/org/apache/beam/sdk/io/amqp/AmqpIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/amqp/src/test/java/org/apache/beam/sdk/io/amqp/AmqpIOTest.java b/sdks/java/io/amqp/src/test/java/org/apache/beam/sdk/io/amqp/AmqpIOTest.java
index c8fe4e8..947929f 100644
--- a/sdks/java/io/amqp/src/test/java/org/apache/beam/sdk/io/amqp/AmqpIOTest.java
+++ b/sdks/java/io/amqp/src/test/java/org/apache/beam/sdk/io/amqp/AmqpIOTest.java
@@ -20,11 +20,11 @@ package org.apache.beam.sdk.io.amqp;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
-import java.net.ServerSocket;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import org.apache.activemq.junit.EmbeddedActiveMQBroker;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
@@ -33,7 +33,6 @@ import org.apache.beam.sdk.values.PCollection;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.message.Message;
import org.apache.qpid.proton.messenger.Messenger;
-import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -49,95 +48,57 @@ public class AmqpIOTest {
private static final Logger LOG = LoggerFactory.getLogger(AmqpIOTest.class);
- private int port;
-
@Rule public TestPipeline pipeline = TestPipeline.create();
- @Before
- public void findFreeNetworkPort() throws Exception {
- LOG.info("Finding free network port");
- ServerSocket socket = new ServerSocket(0);
- port = socket.getLocalPort();
- socket.close();
- }
+ @Rule public EmbeddedAmqpBroker broker = new EmbeddedAmqpBroker();
@Test
public void testRead() throws Exception {
PCollection<Message> output = pipeline.apply(AmqpIO.read()
.withMaxNumRecords(100)
- .withAddresses(Collections.singletonList("amqp://~localhost:" + port)));
+ .withAddresses(Collections.singletonList(broker.getQueueUri("testRead"))));
PAssert.thatSingleton(output.apply(Count.<Message>globally())).isEqualTo(100L);
- Thread sender = new Thread() {
- public void run() {
- try {
- Thread.sleep(500);
- Messenger sender = Messenger.Factory.create();
- sender.start();
- for (int i = 0; i < 100; i++) {
- Message message = Message.Factory.create();
- message.setAddress("amqp://localhost:" + port);
- message.setBody(new AmqpValue("Test " + i));
- sender.put(message);
- sender.send();
- }
- sender.stop();
- } catch (Exception e) {
- LOG.error("Sender error", e);
- }
- }
- };
- try {
- sender.start();
- pipeline.run();
- } finally {
- sender.join();
+ Messenger sender = Messenger.Factory.create();
+ sender.start();
+ for (int i = 0; i < 100; i++) {
+ Message message = Message.Factory.create();
+ message.setAddress(broker.getQueueUri("testRead"));
+ message.setBody(new AmqpValue("Test " + i));
+ sender.put(message);
+ sender.send();
}
+ sender.stop();
+
+ pipeline.run();
}
@Test
public void testWrite() throws Exception {
- final List<String> received = new ArrayList<>();
- Thread receiver = new Thread() {
- @Override
- public void run() {
- try {
- Messenger messenger = Messenger.Factory.create();
- messenger.start();
- messenger.subscribe("amqp://~localhost:" + port);
- while (received.size() < 100) {
- messenger.recv();
- while (messenger.incoming() > 0) {
- Message message = messenger.get();
- LOG.info("Received: " + message.getBody().toString());
- received.add(message.getBody().toString());
- }
- }
- messenger.stop();
- } catch (Exception e) {
- LOG.error("Receiver error", e);
- }
- }
- };
- LOG.info("Starting AMQP receiver");
- receiver.start();
-
List<Message> data = new ArrayList<>();
for (int i = 0; i < 100; i++) {
Message message = Message.Factory.create();
message.setBody(new AmqpValue("Test " + i));
- message.setAddress("amqp://localhost:" + port);
+ message.setAddress(broker.getQueueUri("testWrite"));
message.setSubject("test");
data.add(message);
}
pipeline.apply(Create.of(data).withCoder(AmqpMessageCoder.of())).apply(AmqpIO.write());
- LOG.info("Starting pipeline");
- try {
- pipeline.run();
- } finally {
- LOG.info("Join receiver thread");
- receiver.join();
+ pipeline.run().waitUntilFinish();
+
+ List<String> received = new ArrayList<>();
+ Messenger messenger = Messenger.Factory.create();
+ messenger.start();
+ messenger.subscribe(broker.getQueueUri("testWrite"));
+ while (received.size() < 100) {
+ messenger.recv();
+ while (messenger.incoming() > 0) {
+ Message message = messenger.get();
+ LOG.info("Received: " + message.getBody().toString());
+ received.add(message.getBody().toString());
+ }
}
+ messenger.stop();
assertEquals(100, received.size());
for (int i = 0; i < 100; i++) {
@@ -145,4 +106,19 @@ public class AmqpIOTest {
}
}
+ private static class EmbeddedAmqpBroker extends EmbeddedActiveMQBroker {
+ @Override
+ protected void configure() {
+ try {
+ getBrokerService().addConnector("amqp://localhost:0");
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public String getQueueUri(String queueName) {
+ return getBrokerService().getDefaultSocketURIString() + "/" + queueName;
+ }
+ }
+
}
[2/2] beam git commit: [BEAM-2684] This closes #3714
Posted by jb...@apache.org.
[BEAM-2684] This closes #3714
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/07e8cd5f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/07e8cd5f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/07e8cd5f
Branch: refs/heads/master
Commit: 07e8cd5fce8bd7aff0b6b7304412c8d3e2abf997
Parents: 9088a3e 02eb091
Author: Jean-Baptiste Onofré <jb...@apache.org>
Authored: Fri Aug 11 07:57:52 2017 +0200
Committer: Jean-Baptiste Onofré <jb...@apache.org>
Committed: Fri Aug 11 07:57:52 2017 +0200
----------------------------------------------------------------------
sdks/java/io/amqp/pom.xml | 22 ++++
.../org/apache/beam/sdk/io/amqp/AmqpIOTest.java | 112 ++++++++-----------
2 files changed, 66 insertions(+), 68 deletions(-)
----------------------------------------------------------------------