You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2018/12/21 11:58:02 UTC

[beam] branch master updated: [BEAM-6212] Add MongoDbIO ordered option

This is an automated email from the ASF dual-hosted git repository.

iemejia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 8d611b4  [BEAM-6212] Add MongoDbIO ordered option
     new 094586d  Merge pull request #7256: [BEAM-6212] Add MongoDbIO ordered option
8d611b4 is described below

commit 8d611b4d75cd4102e755a0db8afe02b44749d8ad
Author: Chaim Turkel <cy...@gmail.com>
AuthorDate: Wed Dec 12 00:03:10 2018 +0200

    [BEAM-6212] Add MongoDbIO ordered option
---
 .../org/apache/beam/sdk/io/mongodb/MongoDbIO.java  | 28 +++++++++++++++++++-
 .../apache/beam/sdk/io/mongodb/MongoDbIOTest.java  | 30 ++++++++++++++++++++++
 2 files changed, 57 insertions(+), 1 deletion(-)

diff --git a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
index 309a30d..5689661 100644
--- a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
+++ b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
@@ -23,12 +23,14 @@ import static com.mongodb.client.model.Projections.include;
 import com.google.auto.value.AutoValue;
 import com.google.common.annotations.VisibleForTesting;
 import com.mongodb.BasicDBObject;
+import com.mongodb.MongoBulkWriteException;
 import com.mongodb.MongoClient;
 import com.mongodb.MongoClientOptions;
 import com.mongodb.MongoClientURI;
 import com.mongodb.client.MongoCollection;
 import com.mongodb.client.MongoCursor;
 import com.mongodb.client.MongoDatabase;
+import com.mongodb.client.model.InsertManyOptions;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -119,6 +121,7 @@ public class MongoDbIO {
         .setSslEnabled(false)
         .setIgnoreSSLCertificate(false)
         .setSslInvalidHostNameAllowed(false)
+        .setOrdered(true)
         .build();
   }
 
@@ -612,6 +615,8 @@ public class MongoDbIO {
 
     abstract boolean ignoreSSLCertificate();
 
+    abstract boolean ordered();
+
     @Nullable
     abstract String database();
 
@@ -636,6 +641,8 @@ public class MongoDbIO {
 
       abstract Builder setIgnoreSSLCertificate(boolean value);
 
+      abstract Builder setOrdered(boolean value);
+
       abstract Builder setDatabase(String database);
 
       abstract Builder setCollection(String collection);
@@ -705,6 +712,17 @@ public class MongoDbIO {
       return builder().setSslInvalidHostNameAllowed(invalidHostNameAllowed).build();
     }
 
+    /**
+     * Enables ordered bulk insertion (default: true).
+     *
+     * @see <a href=
+     *     "https://github.com/mongodb/specifications/blob/master/source/crud/crud.rst#basic">
+     *     specification of MongoDb CRUD operations</a>
+     */
+    public Write withOrdered(boolean ordered) {
+      return builder().setOrdered(ordered).build();
+    }
+
     /** Enable ignoreSSLCertificate for ssl for connection (allow for self signed ceritificates). */
     public Write withIgnoreSSLCertificate(boolean ignoreSSLCertificate) {
       return builder().setIgnoreSSLCertificate(ignoreSSLCertificate).build();
@@ -746,6 +764,7 @@ public class MongoDbIO {
       builder.add(DisplayData.item("sslEnable", sslEnabled()));
       builder.add(DisplayData.item("sslInvalidHostNameAllowed", sslInvalidHostNameAllowed()));
       builder.add(DisplayData.item("ignoreSSLCertificate", ignoreSSLCertificate()));
+      builder.add(DisplayData.item("ordered", ordered()));
       builder.add(DisplayData.item("database", database()));
       builder.add(DisplayData.item("collection", collection()));
       builder.add(DisplayData.item("batchSize", batchSize()));
@@ -799,7 +818,14 @@ public class MongoDbIO {
         }
         MongoDatabase mongoDatabase = client.getDatabase(spec.database());
         MongoCollection<Document> mongoCollection = mongoDatabase.getCollection(spec.collection());
-        mongoCollection.insertMany(batch);
+        try {
+          mongoCollection.insertMany(batch, new InsertManyOptions().ordered(spec.ordered()));
+        } catch (MongoBulkWriteException e) {
+          if (spec.ordered()) {
+            throw e;
+          }
+        }
+
         batch.clear();
       }
 
diff --git a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java
index 09d8d3a..1d14eef 100644
--- a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java
+++ b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java
@@ -368,4 +368,34 @@ public class MongoDbIOTest implements Serializable {
 
     Assert.assertEquals(0, collection.count());
   }
+
+  @Test
+  public void testWriteUnordered() throws Exception {
+    Document doc =
+        Document.parse("{\"_id\":\"521df3a4300466f1f2b5ae82\",\"scientist\":\"Test %s\"}");
+
+    pipeline
+        .apply(Create.of(doc, doc))
+        .apply(
+            MongoDbIO.write()
+                .withUri("mongodb://localhost:" + port)
+                .withDatabase("test")
+                .withOrdered(false)
+                .withCollection("test"));
+    pipeline.run();
+
+    MongoClient client = new MongoClient("localhost", port);
+    MongoDatabase database = client.getDatabase("test");
+    MongoCollection collection = database.getCollection("test");
+
+    MongoCursor cursor = collection.find().iterator();
+
+    int count = 0;
+    while (cursor.hasNext()) {
+      count = count + 1;
+      cursor.next();
+    }
+
+    assertEquals(1, count);
+  }
 }