You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/11/12 08:13:21 UTC

[1/2] incubator-beam git commit: [BEAM-856] Use free available port to start the test MongoDB instance

Repository: incubator-beam
Updated Branches:
  refs/heads/master 7d069a65b -> 8bfa08519


[BEAM-856] Use free available port to start the test MongoDB instance

[BEAM-856] Use random port for MongoDB instance in test, fix MongoClient init in estimatedSizeBytes and splitIntoBundles methods using provided URI


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/dacd5233
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/dacd5233
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/dacd5233

Branch: refs/heads/master
Commit: dacd52334667a38955332ec03feee8079aeae571
Parents: 7d069a6
Author: Jean-Baptiste Onofr� <jb...@apache.org>
Authored: Mon Oct 31 17:27:15 2016 +0100
Committer: Davor Bonaci <da...@google.com>
Committed: Sat Nov 12 00:12:18 2016 -0800

----------------------------------------------------------------------
 .../apache/beam/sdk/io/mongodb/MongoDbIO.java   |  4 +--
 .../sdk/io/mongodb/MongoDBGridFSIOTest.java     | 28 ++++++++++-----
 .../beam/sdk/io/mongodb/MongoDbIOTest.java      | 38 ++++++++++++++------
 3 files changed, 49 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dacd5233/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
index 20b9265..2729602 100644
--- a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
+++ b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
@@ -216,7 +216,7 @@ public class MongoDbIO {
 
     @Override
     public long getEstimatedSizeBytes(PipelineOptions pipelineOptions) {
-      MongoClient mongoClient = new MongoClient();
+      MongoClient mongoClient = new MongoClient(new MongoClientURI(spec.uri()));
       MongoDatabase mongoDatabase = mongoClient.getDatabase(spec.database());
 
       // get the Mongo collStats object
@@ -230,7 +230,7 @@ public class MongoDbIO {
     @Override
     public List<BoundedSource<Document>> splitIntoBundles(long desiredBundleSizeBytes,
                                                 PipelineOptions options) {
-      MongoClient mongoClient = new MongoClient();
+      MongoClient mongoClient = new MongoClient(new MongoClientURI(spec.uri()));
       MongoDatabase mongoDatabase = mongoClient.getDatabase(spec.database());
 
       List<Document> splitKeys;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dacd5233/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java
index 5f4d122..dc27ee2 100644
--- a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java
+++ b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java
@@ -26,6 +26,7 @@ import com.mongodb.gridfs.GridFSDBFile;
 import com.mongodb.gridfs.GridFSInputFile;
 
 import de.flapdoodle.embed.mongo.MongodExecutable;
+import de.flapdoodle.embed.mongo.MongodProcess;
 import de.flapdoodle.embed.mongo.MongodStarter;
 import de.flapdoodle.embed.mongo.config.IMongodConfig;
 import de.flapdoodle.embed.mongo.config.MongoCmdOptionsBuilder;
@@ -45,6 +46,7 @@ import java.io.InputStreamReader;
 import java.io.OutputStream;
 import java.io.OutputStreamWriter;
 import java.io.Serializable;
+import java.net.ServerSocket;
 import java.util.List;
 import java.util.Random;
 import java.util.Scanner;
@@ -82,14 +84,21 @@ public class MongoDBGridFSIOTest implements Serializable {
   private static final Logger LOGGER = LoggerFactory.getLogger(MongoDBGridFSIOTest.class);
 
   private static final String MONGODB_LOCATION = "target/mongodb";
-  private static final int PORT = 27017;
   private static final String DATABASE = "gridfs";
 
+  private static final transient MongodStarter mongodStarter = MongodStarter.getDefaultInstance();
+
   private static transient MongodExecutable mongodExecutable;
+  private static transient MongodProcess mongodProcess;
+
+  private static int port;
 
   @BeforeClass
   public static void setup() throws Exception {
-    LOGGER.info("Starting MongoDB embedded instance");
+    try (ServerSocket serverSocket = new ServerSocket(0)) {
+      port = serverSocket.getLocalPort();
+    }
+    LOGGER.info("Starting MongoDB embedded instance on {}", port);
     try {
       Files.forceDelete(new File(MONGODB_LOCATION));
     } catch (Exception e) {
@@ -100,7 +109,7 @@ public class MongoDBGridFSIOTest implements Serializable {
         .version(Version.Main.PRODUCTION)
         .configServer(false)
         .replication(new Storage(MONGODB_LOCATION, null, 0))
-        .net(new Net("localhost", PORT, Network.localhostIsIPv6()))
+        .net(new Net("localhost", port, Network.localhostIsIPv6()))
         .cmdOptions(new MongoCmdOptionsBuilder()
             .syncDelay(10)
             .useNoPrealloc(true)
@@ -108,12 +117,12 @@ public class MongoDBGridFSIOTest implements Serializable {
             .useNoJournal(true)
             .build())
         .build();
-    mongodExecutable = MongodStarter.getDefaultInstance().prepare(mongodConfig);
-    mongodExecutable.start();
+    mongodExecutable = mongodStarter.prepare(mongodConfig);
+    mongodProcess = mongodExecutable.start();
 
     LOGGER.info("Insert test data");
 
-    Mongo client = new Mongo("localhost", PORT);
+    Mongo client = new Mongo("localhost", port);
     DB database = client.getDB(DATABASE);
     GridFS gridfs = new GridFS(database);
 
@@ -159,6 +168,7 @@ public class MongoDBGridFSIOTest implements Serializable {
   @AfterClass
   public static void stop() throws Exception {
     LOGGER.info("Stopping MongoDB instance");
+    mongodProcess.stop();
     mongodExecutable.stop();
   }
 
@@ -169,7 +179,7 @@ public class MongoDBGridFSIOTest implements Serializable {
 
     PCollection<String> output = pipeline.apply(
         MongoDbGridFSIO.<String>read()
-            .withUri("mongodb://localhost:" + PORT)
+            .withUri("mongodb://localhost:" + port)
             .withDatabase(DATABASE));
 
     PAssert.thatSingleton(
@@ -199,7 +209,7 @@ public class MongoDBGridFSIOTest implements Serializable {
 
     PCollection<KV<String, Integer>> output = pipeline.apply(
         MongoDbGridFSIO.<KV<String, Integer>>read()
-            .withUri("mongodb://localhost:" + PORT)
+            .withUri("mongodb://localhost:" + port)
             .withDatabase(DATABASE)
             .withBucket("mapBucket")
             .withParser(new MongoDbGridFSIO.Parser<KV<String, Integer>>() {
@@ -246,7 +256,7 @@ public class MongoDBGridFSIOTest implements Serializable {
   public void testSplit() throws Exception {
     PipelineOptions options = PipelineOptionsFactory.create();
     MongoDbGridFSIO.Read<String> read = MongoDbGridFSIO.<String>read()
-        .withUri("mongodb://localhost:" + PORT)
+        .withUri("mongodb://localhost:" + port)
         .withDatabase(DATABASE);
 
     BoundedGridFSSource src = new BoundedGridFSSource(read, null);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dacd5233/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java
index 9a88267..129e81c 100644
--- a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java
+++ b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java
@@ -25,6 +25,7 @@ import com.mongodb.client.MongoCursor;
 import com.mongodb.client.MongoDatabase;
 
 import de.flapdoodle.embed.mongo.MongodExecutable;
+import de.flapdoodle.embed.mongo.MongodProcess;
 import de.flapdoodle.embed.mongo.MongodStarter;
 import de.flapdoodle.embed.mongo.config.IMongodConfig;
 import de.flapdoodle.embed.mongo.config.MongoCmdOptionsBuilder;
@@ -37,6 +38,7 @@ import de.flapdoodle.embed.process.runtime.Network;
 
 import java.io.File;
 import java.io.Serializable;
+import java.net.ServerSocket;
 import java.util.ArrayList;
 
 import org.apache.beam.sdk.testing.NeedsRunner;
@@ -54,6 +56,7 @@ import org.bson.Document;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.slf4j.Logger;
@@ -67,15 +70,29 @@ public class MongoDbIOTest implements Serializable {
   private static final Logger LOGGER = LoggerFactory.getLogger(MongoDbIOTest.class);
 
   private static final String MONGODB_LOCATION = "target/mongodb";
-  private static final int PORT = 27017;
   private static final String DATABASE = "beam";
   private static final String COLLECTION = "test";
 
+  private static final MongodStarter mongodStarter = MongodStarter.getDefaultInstance();
+
   private transient MongodExecutable mongodExecutable;
+  private transient MongodProcess mongodProcess;
+
+  private static int port;
+
+  /**
+   * Looking for an available network port.
+   */
+  @BeforeClass
+  public static void availablePort() throws Exception {
+    try (ServerSocket serverSocket = new ServerSocket(0)) {
+      port = serverSocket.getLocalPort();
+    }
+  }
 
   @Before
   public void setup() throws Exception {
-    LOGGER.info("Starting MongoDB embedded instance");
+    LOGGER.info("Starting MongoDB embedded instance on {}", port);
     try {
       Files.forceDelete(new File(MONGODB_LOCATION));
     } catch (Exception e) {
@@ -86,7 +103,7 @@ public class MongoDbIOTest implements Serializable {
         .version(Version.Main.PRODUCTION)
         .configServer(false)
         .replication(new Storage(MONGODB_LOCATION, null, 0))
-        .net(new Net("localhost", PORT, Network.localhostIsIPv6()))
+        .net(new Net("localhost", port, Network.localhostIsIPv6()))
         .cmdOptions(new MongoCmdOptionsBuilder()
             .syncDelay(10)
             .useNoPrealloc(true)
@@ -94,12 +111,12 @@ public class MongoDbIOTest implements Serializable {
             .useNoJournal(true)
             .build())
         .build();
-    mongodExecutable = MongodStarter.getDefaultInstance().prepare(mongodConfig);
-    mongodExecutable.start();
+    mongodExecutable = mongodStarter.prepare(mongodConfig);
+    mongodProcess = mongodExecutable.start();
 
     LOGGER.info("Insert test data");
 
-    MongoClient client = new MongoClient("localhost", PORT);
+    MongoClient client = new MongoClient("localhost", port);
     MongoDatabase database = client.getDatabase(DATABASE);
 
     MongoCollection collection = database.getCollection(COLLECTION);
@@ -119,6 +136,7 @@ public class MongoDbIOTest implements Serializable {
   @After
   public void stop() throws Exception {
     LOGGER.info("Stopping MongoDB instance");
+    mongodProcess.stop();
     mongodExecutable.stop();
   }
 
@@ -129,7 +147,7 @@ public class MongoDbIOTest implements Serializable {
 
     PCollection<Document> output = pipeline.apply(
         MongoDbIO.read()
-          .withUri("mongodb://localhost:" + PORT)
+          .withUri("mongodb://localhost:" + port)
           .withDatabase(DATABASE)
           .withCollection(COLLECTION));
 
@@ -163,7 +181,7 @@ public class MongoDbIOTest implements Serializable {
 
     PCollection<Document> output = pipeline.apply(
         MongoDbIO.read()
-        .withUri("mongodb://localhost:" + PORT)
+        .withUri("mongodb://localhost:" + port)
         .withDatabase(DATABASE)
         .withCollection(COLLECTION)
         .withFilter("{\"scientist\":\"Einstein\"}"));
@@ -184,12 +202,12 @@ public class MongoDbIOTest implements Serializable {
       data.add(Document.parse(String.format("{\"scientist\":\"Test %s\"}", i)));
     }
     pipeline.apply(Create.of(data))
-        .apply(MongoDbIO.write().withUri("mongodb://localhost:" + PORT).withDatabase("test")
+        .apply(MongoDbIO.write().withUri("mongodb://localhost:" + port).withDatabase("test")
             .withCollection("test"));
 
     pipeline.run();
 
-    MongoClient client = new MongoClient("localhost", PORT);
+    MongoClient client = new MongoClient("localhost", port);
     MongoDatabase database = client.getDatabase("test");
     MongoCollection collection = database.getCollection("test");
 


[2/2] incubator-beam git commit: This closes #1236

Posted by da...@apache.org.
This closes #1236


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8bfa0851
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8bfa0851
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8bfa0851

Branch: refs/heads/master
Commit: 8bfa0851930f38fd1f63b3b17b7fb15f0f88beda
Parents: 7d069a6 dacd523
Author: Davor Bonaci <da...@google.com>
Authored: Sat Nov 12 00:12:40 2016 -0800
Committer: Davor Bonaci <da...@google.com>
Committed: Sat Nov 12 00:12:40 2016 -0800

----------------------------------------------------------------------
 .../apache/beam/sdk/io/mongodb/MongoDbIO.java   |  4 +--
 .../sdk/io/mongodb/MongoDBGridFSIOTest.java     | 28 ++++++++++-----
 .../beam/sdk/io/mongodb/MongoDbIOTest.java      | 38 ++++++++++++++------
 3 files changed, 49 insertions(+), 21 deletions(-)
----------------------------------------------------------------------