You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2018/12/21 11:58:02 UTC
[beam] branch master updated: [BEAM-6212] Add MongoDbIO ordered
option
This is an automated email from the ASF dual-hosted git repository.
iemejia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 8d611b4 [BEAM-6212] Add MongoDbIO ordered option
new 094586d Merge pull request #7256: [BEAM-6212] Add MongoDbIO ordered option
8d611b4 is described below
commit 8d611b4d75cd4102e755a0db8afe02b44749d8ad
Author: Chaim Turkel <cy...@gmail.com>
AuthorDate: Wed Dec 12 00:03:10 2018 +0200
[BEAM-6212] Add MongoDbIO ordered option
---
.../org/apache/beam/sdk/io/mongodb/MongoDbIO.java | 28 +++++++++++++++++++-
.../apache/beam/sdk/io/mongodb/MongoDbIOTest.java | 30 ++++++++++++++++++++++
2 files changed, 57 insertions(+), 1 deletion(-)
diff --git a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
index 309a30d..5689661 100644
--- a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
+++ b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
@@ -23,12 +23,14 @@ import static com.mongodb.client.model.Projections.include;
import com.google.auto.value.AutoValue;
import com.google.common.annotations.VisibleForTesting;
import com.mongodb.BasicDBObject;
+import com.mongodb.MongoBulkWriteException;
import com.mongodb.MongoClient;
import com.mongodb.MongoClientOptions;
import com.mongodb.MongoClientURI;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.MongoDatabase;
+import com.mongodb.client.model.InsertManyOptions;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -119,6 +121,7 @@ public class MongoDbIO {
.setSslEnabled(false)
.setIgnoreSSLCertificate(false)
.setSslInvalidHostNameAllowed(false)
+ .setOrdered(true)
.build();
}
@@ -612,6 +615,8 @@ public class MongoDbIO {
abstract boolean ignoreSSLCertificate();
+ abstract boolean ordered();
+
@Nullable
abstract String database();
@@ -636,6 +641,8 @@ public class MongoDbIO {
abstract Builder setIgnoreSSLCertificate(boolean value);
+ abstract Builder setOrdered(boolean value);
+
abstract Builder setDatabase(String database);
abstract Builder setCollection(String collection);
@@ -705,6 +712,17 @@ public class MongoDbIO {
return builder().setSslInvalidHostNameAllowed(invalidHostNameAllowed).build();
}
+ /**
+ * Enables ordered bulk insertion (default: true).
+ *
+ * @see <a href=
+ * "https://github.com/mongodb/specifications/blob/master/source/crud/crud.rst#basic">
+ * specification of MongoDb CRUD operations</a>
+ */
+ public Write withOrdered(boolean ordered) {
+ return builder().setOrdered(ordered).build();
+ }
+
/** Enable ignoreSSLCertificate for ssl for connection (allow for self signed ceritificates). */
public Write withIgnoreSSLCertificate(boolean ignoreSSLCertificate) {
return builder().setIgnoreSSLCertificate(ignoreSSLCertificate).build();
@@ -746,6 +764,7 @@ public class MongoDbIO {
builder.add(DisplayData.item("sslEnable", sslEnabled()));
builder.add(DisplayData.item("sslInvalidHostNameAllowed", sslInvalidHostNameAllowed()));
builder.add(DisplayData.item("ignoreSSLCertificate", ignoreSSLCertificate()));
+ builder.add(DisplayData.item("ordered", ordered()));
builder.add(DisplayData.item("database", database()));
builder.add(DisplayData.item("collection", collection()));
builder.add(DisplayData.item("batchSize", batchSize()));
@@ -799,7 +818,14 @@ public class MongoDbIO {
}
MongoDatabase mongoDatabase = client.getDatabase(spec.database());
MongoCollection<Document> mongoCollection = mongoDatabase.getCollection(spec.collection());
- mongoCollection.insertMany(batch);
+ try {
+ mongoCollection.insertMany(batch, new InsertManyOptions().ordered(spec.ordered()));
+ } catch (MongoBulkWriteException e) {
+ if (spec.ordered()) {
+ throw e;
+ }
+ }
+
batch.clear();
}
diff --git a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java
index 09d8d3a..1d14eef 100644
--- a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java
+++ b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java
@@ -368,4 +368,34 @@ public class MongoDbIOTest implements Serializable {
Assert.assertEquals(0, collection.count());
}
+
+ @Test
+ public void testWriteUnordered() throws Exception {
+ Document doc =
+ Document.parse("{\"_id\":\"521df3a4300466f1f2b5ae82\",\"scientist\":\"Test %s\"}");
+
+ pipeline
+ .apply(Create.of(doc, doc))
+ .apply(
+ MongoDbIO.write()
+ .withUri("mongodb://localhost:" + port)
+ .withDatabase("test")
+ .withOrdered(false)
+ .withCollection("test"));
+ pipeline.run();
+
+ MongoClient client = new MongoClient("localhost", port);
+ MongoDatabase database = client.getDatabase("test");
+ MongoCollection collection = database.getCollection("test");
+
+ MongoCursor cursor = collection.find().iterator();
+
+ int count = 0;
+ while (cursor.hasNext()) {
+ count = count + 1;
+ cursor.next();
+ }
+
+ assertEquals(1, count);
+ }
}