You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2017/06/19 19:24:16 UTC
[1/2] beam git commit: [BEAM-975] Improve default connection options,
javadoc and style in MongoDbIO
Repository: beam
Updated Branches:
refs/heads/master 1476f3412 -> b400f4a6f
[BEAM-975] Improve default connection options, javadoc and style in MongoDbIO
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/87be64e9
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/87be64e9
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/87be64e9
Branch: refs/heads/master
Commit: 87be64e9817da5e5c86a243471021268d6281b33
Parents: 1476f34
Author: Jean-Baptiste Onofré <jb...@apache.org>
Authored: Fri May 12 15:21:49 2017 +0200
Committer: Jean-Baptiste Onofré <jb...@apache.org>
Committed: Mon Jun 19 21:23:11 2017 +0200
----------------------------------------------------------------------
.../apache/beam/sdk/io/mongodb/MongoDbIO.java | 315 +++++++++++++++----
.../beam/sdk/io/mongodb/MongoDbIOTest.java | 37 +++
2 files changed, 283 insertions(+), 69 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/87be64e9/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
index 620df74..04d9975 100644
--- a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
+++ b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
@@ -18,12 +18,13 @@
package org.apache.beam.sdk.io.mongodb;
import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
import com.google.auto.value.AutoValue;
import com.google.common.annotations.VisibleForTesting;
import com.mongodb.BasicDBObject;
import com.mongodb.MongoClient;
+import com.mongodb.MongoClientOptions;
import com.mongodb.MongoClientURI;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
@@ -100,12 +101,20 @@ public class MongoDbIO {
/** Read data from MongoDB. */
public static Read read() {
- return new AutoValue_MongoDbIO_Read.Builder().setNumSplits(0).build();
+ return new AutoValue_MongoDbIO_Read.Builder()
+ .setKeepAlive(true)
+ .setMaxConnectionIdleTime(60000)
+ .setNumSplits(0)
+ .build();
}
/** Write data to MongoDB. */
public static Write write() {
- return new AutoValue_MongoDbIO_Write.Builder().setBatchSize(1024L).build();
+ return new AutoValue_MongoDbIO_Write.Builder()
+ .setKeepAlive(true)
+ .setMaxConnectionIdleTime(60000)
+ .setBatchSize(1024L)
+ .build();
}
private MongoDbIO() {
@@ -117,16 +126,20 @@ public class MongoDbIO {
@AutoValue
public abstract static class Read extends PTransform<PBegin, PCollection<Document>> {
@Nullable abstract String uri();
+ abstract boolean keepAlive();
+ abstract int maxConnectionIdleTime();
@Nullable abstract String database();
@Nullable abstract String collection();
@Nullable abstract String filter();
abstract int numSplits();
- abstract Builder toBuilder();
+ abstract Builder builder();
@AutoValue.Builder
abstract static class Builder {
abstract Builder setUri(String uri);
+ abstract Builder setKeepAlive(boolean keepAlive);
+ abstract Builder setMaxConnectionIdleTime(int maxConnectionIdleTime);
abstract Builder setDatabase(String database);
abstract Builder setCollection(String collection);
abstract Builder setFilter(String filter);
@@ -135,31 +148,94 @@ public class MongoDbIO {
}
/**
- * Example documentation for withUri.
+ * Define the location of the MongoDB instances using an URI. The URI describes the hosts to
+ * be used and some options.
+ *
+ * <p>The format of the URI is:
+ *
+ * <pre>{@code
+ * mongodb://[username:password@]host1[:port1]...[,hostN[:portN]]][/[database][?options]]
+ * }</pre>
+ *
+ * <p>Where:
+ * <ul>
+ * <li>{@code mongodb://} is a required prefix to identify that this is a string in the
+ * standard connection format.</li>
+ * <li>{@code username:password@} are optional. If given, the driver will attempt to
+ * login to a database after connecting to a database server. For some authentication
+ * mechanisms, only the username is specified and the password is not, in which case
+ * the ":" after the username is left off as well.</li>
+ * <li>{@code host1} is the only required part of the URI. It identifies a server
+ * address to connect to.</li>
+ * <li>{@code :portX} is optional and defaults to {@code :27017} if not provided.</li>
+ * <li>{@code /database} is the name of the database to login to and thus is only
+ * relevant if the {@code username:password@} syntax is used. If not specified, the
+ * "admin" database will be used by default. It has to be equivalent with the database
+ * you specific with {@link Read#withDatabase(String)}.</li>
+ * <li>{@code ?options} are connection options. Note that if {@code database} is absent
+ * there is still a {@code /} required between the last {@code host} and the {@code ?}
+ * introducing the options. Options are name=value pairs and the pairs are separated by
+ * "{@code &}". The {@code KeepAlive} connection option can't be passed via the URI,
+ * instead you have to use {@link Read#withKeepAlive(boolean)}. Same for the
+ * {@code MaxConnectionIdleTime} connection option via
+ * {@link Read#withMaxConnectionIdleTime(int)}.
+ * </li>
+ * </ul>
*/
public Read withUri(String uri) {
- checkNotNull(uri);
- return toBuilder().setUri(uri).build();
+ checkArgument(uri != null, "MongoDbIO.read().withUri(uri) called with null uri");
+ return builder().setUri(uri).build();
+ }
+
+ /**
+ * Sets whether socket keep alive is enabled.
+ */
+ public Read withKeepAlive(boolean keepAlive) {
+ return builder().setKeepAlive(keepAlive).build();
+ }
+
+ /**
+ * Sets the maximum idle time for a pooled connection.
+ */
+ public Read withMaxConnectionIdleTime(int maxConnectionIdleTime) {
+ return builder().setMaxConnectionIdleTime(maxConnectionIdleTime).build();
}
+ /**
+ * Sets the database to use.
+ */
public Read withDatabase(String database) {
- checkNotNull(database);
- return toBuilder().setDatabase(database).build();
+ checkArgument(database != null, "MongoDbIO.read().withDatabase(database) called with null"
+ + " database");
+ return builder().setDatabase(database).build();
}
+ /**
+ * Sets the collection to consider in the database.
+ */
public Read withCollection(String collection) {
- checkNotNull(collection);
- return toBuilder().setCollection(collection).build();
+ checkArgument(collection != null, "MongoDbIO.read().withCollection(collection) called "
+ + "with null collection");
+ return builder().setCollection(collection).build();
}
+ /**
+ * Sets a filter on the documents in a collection.
+ */
public Read withFilter(String filter) {
- checkNotNull(filter);
- return toBuilder().setFilter(filter).build();
+ checkArgument(filter != null, "MongoDbIO.read().withFilter(filter) called with null "
+ + "filter");
+ return builder().setFilter(filter).build();
}
+ /**
+ * Sets the user defined number of splits.
+ */
public Read withNumSplits(int numSplits) {
- checkArgument(numSplits >= 0);
- return toBuilder().setNumSplits(numSplits).build();
+ checkArgument(numSplits >= 0, "MongoDbIO.read().withNumSplits(numSplits) called with "
+ + "invalid number. The number of splits has to be a positive value (currently %d)",
+ numSplits);
+ return builder().setNumSplits(numSplits).build();
}
@Override
@@ -169,15 +245,19 @@ public class MongoDbIO {
@Override
public void validate(PipelineOptions options) {
- checkNotNull(uri(), "uri");
- checkNotNull(database(), "database");
- checkNotNull(collection(), "collection");
+ checkState(uri() != null, "MongoDbIO.read() requires an URI to be set via withUri(uri)");
+ checkState(database() != null, "MongoDbIO.read() requires a database to be set via "
+ + "withDatabase(database)");
+ checkState(collection() != null, "MongoDbIO.read() requires a collection to be set via "
+ + "withCollection(collection)");
}
@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
builder.add(DisplayData.item("uri", uri()));
+ builder.add(DisplayData.item("keepAlive", keepAlive()));
+ builder.add(DisplayData.item("maxConnectionIdleTime", maxConnectionIdleTime()));
builder.add(DisplayData.item("database", database()));
builder.add(DisplayData.item("collection", collection()));
builder.addIfNotNull(DisplayData.item("filter", filter()));
@@ -218,61 +298,71 @@ public class MongoDbIO {
@Override
public long getEstimatedSizeBytes(PipelineOptions pipelineOptions) {
- MongoClient mongoClient = new MongoClient(new MongoClientURI(spec.uri()));
- MongoDatabase mongoDatabase = mongoClient.getDatabase(spec.database());
+ try (MongoClient mongoClient = new MongoClient(new MongoClientURI(spec.uri()))) {
+ return getEstimatedSizeBytes(mongoClient, spec.database(), spec.collection());
+ }
+ }
+
+ private long getEstimatedSizeBytes(MongoClient mongoClient,
+ String database,
+ String collection) {
+ MongoDatabase mongoDatabase = mongoClient.getDatabase(database);
// get the Mongo collStats object
// it gives the size for the entire collection
BasicDBObject stat = new BasicDBObject();
- stat.append("collStats", spec.collection());
+ stat.append("collStats", collection);
Document stats = mongoDatabase.runCommand(stat);
+
return stats.get("size", Number.class).longValue();
}
@Override
public List<BoundedSource<Document>> split(long desiredBundleSizeBytes,
PipelineOptions options) {
- MongoClient mongoClient = new MongoClient(new MongoClientURI(spec.uri()));
- MongoDatabase mongoDatabase = mongoClient.getDatabase(spec.database());
-
- List<Document> splitKeys;
- if (spec.numSplits() > 0) {
- // the user defines his desired number of splits
- // calculate the batch size
- long estimatedSizeBytes = getEstimatedSizeBytes(options);
- desiredBundleSizeBytes = estimatedSizeBytes / spec.numSplits();
- }
+ try (MongoClient mongoClient = new MongoClient(new MongoClientURI(spec.uri()))) {
+ MongoDatabase mongoDatabase = mongoClient.getDatabase(spec.database());
+
+ List<Document> splitKeys;
+ if (spec.numSplits() > 0) {
+ // the user defines his desired number of splits
+ // calculate the batch size
+ long estimatedSizeBytes = getEstimatedSizeBytes(mongoClient,
+ spec.database(), spec.collection());
+ desiredBundleSizeBytes = estimatedSizeBytes / spec.numSplits();
+ }
- // the desired batch size is small, using default chunk size of 1MB
- if (desiredBundleSizeBytes < 1024 * 1024) {
- desiredBundleSizeBytes = 1 * 1024 * 1024;
- }
+ // the desired batch size is small, using default chunk size of 1MB
+ if (desiredBundleSizeBytes < 1024 * 1024) {
+ desiredBundleSizeBytes = 1 * 1024 * 1024;
+ }
- // now we have the batch size (provided by user or provided by the runner)
- // we use Mongo splitVector command to get the split keys
- BasicDBObject splitVectorCommand = new BasicDBObject();
- splitVectorCommand.append("splitVector", spec.database() + "." + spec.collection());
- splitVectorCommand.append("keyPattern", new BasicDBObject().append("_id", 1));
- splitVectorCommand.append("force", false);
- // maxChunkSize is the Mongo partition size in MB
- LOG.debug("Splitting in chunk of {} MB", desiredBundleSizeBytes / 1024 / 1024);
- splitVectorCommand.append("maxChunkSize", desiredBundleSizeBytes / 1024 / 1024);
- Document splitVectorCommandResult = mongoDatabase.runCommand(splitVectorCommand);
- splitKeys = (List<Document>) splitVectorCommandResult.get("splitKeys");
-
- List<BoundedSource<Document>> sources = new ArrayList<>();
- if (splitKeys.size() < 1) {
- LOG.debug("Split keys is low, using an unique source");
- sources.add(this);
- return sources;
- }
+ // now we have the batch size (provided by user or provided by the runner)
+ // we use Mongo splitVector command to get the split keys
+ BasicDBObject splitVectorCommand = new BasicDBObject();
+ splitVectorCommand.append("splitVector", spec.database() + "." + spec.collection());
+ splitVectorCommand.append("keyPattern", new BasicDBObject().append("_id", 1));
+ splitVectorCommand.append("force", false);
+ // maxChunkSize is the Mongo partition size in MB
+ LOG.debug("Splitting in chunk of {} MB", desiredBundleSizeBytes / 1024 / 1024);
+ splitVectorCommand.append("maxChunkSize", desiredBundleSizeBytes / 1024 / 1024);
+ Document splitVectorCommandResult = mongoDatabase.runCommand(splitVectorCommand);
+ splitKeys = (List<Document>) splitVectorCommandResult.get("splitKeys");
+
+ List<BoundedSource<Document>> sources = new ArrayList<>();
+ if (splitKeys.size() < 1) {
+ LOG.debug("Split keys is low, using an unique source");
+ sources.add(this);
+ return sources;
+ }
- LOG.debug("Number of splits is {}", splitKeys.size());
- for (String shardFilter : splitKeysToFilters(splitKeys, spec.filter())) {
- sources.add(new BoundedMongoDbSource(spec.withFilter(shardFilter)));
- }
+ LOG.debug("Number of splits is {}", splitKeys.size());
+ for (String shardFilter : splitKeysToFilters(splitKeys, spec.filter())) {
+ sources.add(new BoundedMongoDbSource(spec.withFilter(shardFilter)));
+ }
- return sources;
+ return sources;
+ }
}
/**
@@ -367,7 +457,10 @@ public class MongoDbIO {
@Override
public boolean start() {
Read spec = source.spec;
- client = new MongoClient(new MongoClientURI(spec.uri()));
+ MongoClientOptions.Builder optionsBuilder = new MongoClientOptions.Builder();
+ optionsBuilder.maxConnectionIdleTime(spec.maxConnectionIdleTime());
+ optionsBuilder.socketKeepAlive(spec.keepAlive());
+ client = new MongoClient(new MongoClientURI(spec.uri(), optionsBuilder));
MongoDatabase mongoDatabase = client.getDatabase(spec.database());
@@ -426,36 +519,106 @@ public class MongoDbIO {
*/
@AutoValue
public abstract static class Write extends PTransform<PCollection<Document>, PDone> {
+
@Nullable abstract String uri();
+ abstract boolean keepAlive();
+ abstract int maxConnectionIdleTime();
@Nullable abstract String database();
@Nullable abstract String collection();
abstract long batchSize();
- abstract Builder toBuilder();
+ abstract Builder builder();
@AutoValue.Builder
abstract static class Builder {
abstract Builder setUri(String uri);
+ abstract Builder setKeepAlive(boolean keepAlive);
+ abstract Builder setMaxConnectionIdleTime(int maxConnectionIdleTime);
abstract Builder setDatabase(String database);
abstract Builder setCollection(String collection);
abstract Builder setBatchSize(long batchSize);
abstract Write build();
}
+ /**
+ * Define the location of the MongoDB instances using an URI. The URI describes the hosts to
+ * be used and some options.
+ *
+ * <p>The format of the URI is:
+ *
+ * <pre>{@code
+ * mongodb://[username:password@]host1[:port1],...[,hostN[:portN]]][/[database][?options]]
+ * }</pre>
+ *
+ * <p>Where:
+ * <ul>
+ * <li>{@code mongodb://} is a required prefix to identify that this is a string in the
+ * standard connection format.</li>
+ * <li>{@code username:password@} are optional. If given, the driver will attempt to
+ * login to a database after connecting to a database server. For some authentication
+ * mechanisms, only the username is specified and the password is not, in which case
+ * the ":" after the username is left off as well.</li>
+ * <li>{@code host1} is the only required part of the URI. It identifies a server
+ * address to connect to.</li>
+ * <li>{@code :portX} is optional and defaults to {@code :27017} if not provided.</li>
+ * <li>{@code /database} is the name of the database to login to and thus is only
+ * relevant if the {@code username:password@} syntax is used. If not specified, the
+ * "admin" database will be used by default. It has to be equivalent with the database
+ * you specific with {@link Write#withDatabase(String)}.</li>
+ * <li>{@code ?options} are connection options. Note that if {@code database} is absent
+ * there is still a {@code /} required between the last {@code host} and the {@code ?}
+ * introducing the options. Options are name=value pairs and the pairs are separated by
+ * "{@code &}". The {@code KeepAlive} connection option can't be passed via the URI, instead
+ * you have to use {@link Write#withKeepAlive(boolean)}. Same for the
+ * {@code MaxConnectionIdleTime} connection option via
+ * {@link Write#withMaxConnectionIdleTime(int)}.
+ * </li>
+ * </ul>
+ */
public Write withUri(String uri) {
- return toBuilder().setUri(uri).build();
+ checkArgument(uri != null, "MongoDbIO.write().withUri(uri) called with null uri");
+ return builder().setUri(uri).build();
+ }
+
+ /**
+ * Sets whether socket keep alive is enabled.
+ */
+ public Write withKeepAlive(boolean keepAlive) {
+ return builder().setKeepAlive(keepAlive).build();
+ }
+
+ /**
+ * Sets the maximum idle time for a pooled connection.
+ */
+ public Write withMaxConnectionIdleTime(int maxConnectionIdleTime) {
+ return builder().setMaxConnectionIdleTime(maxConnectionIdleTime).build();
}
+ /**
+ * Sets the database to use.
+ */
public Write withDatabase(String database) {
- return toBuilder().setDatabase(database).build();
+ checkArgument(database != null, "MongoDbIO.write().withDatabase(database) called with "
+ + "null database");
+ return builder().setDatabase(database).build();
}
+ /**
+ * Sets the collection where to write data in the database.
+ */
public Write withCollection(String collection) {
- return toBuilder().setCollection(collection).build();
+ checkArgument(collection != null, "MongoDbIO.write().withCollection(collection) called "
+ + "with null collection");
+ return builder().setCollection(collection).build();
}
+ /**
+ * Define the size of the batch to group write operations.
+ */
public Write withBatchSize(long batchSize) {
- return toBuilder().setBatchSize(batchSize).build();
+ checkArgument(batchSize >= 0, "MongoDbIO.write().withBatchSize(batchSize) called with "
+ + "invalid batch size. Batch size has to be >= 0 (currently %d)", batchSize);
+ return builder().setBatchSize(batchSize).build();
}
@Override
@@ -466,10 +629,21 @@ public class MongoDbIO {
@Override
public void validate(PipelineOptions options) {
- checkNotNull(uri(), "uri");
- checkNotNull(database(), "database");
- checkNotNull(collection(), "collection");
- checkNotNull(batchSize(), "batchSize");
+ checkState(uri() != null, "MongoDbIO.write() requires an URI to be set via withUri(uri)");
+ checkState(database() != null, "MongoDbIO.write() requires a database to be set via "
+ + "withDatabase(database)");
+ checkState(collection() != null, "MongoDbIO.write() requires a collection to be set via "
+ + "withCollection(collection)");
+ }
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ builder.add(DisplayData.item("uri", uri()));
+ builder.add(DisplayData.item("keepAlive", keepAlive()));
+ builder.add(DisplayData.item("maxConnectionIdleTime", maxConnectionIdleTime()));
+ builder.add(DisplayData.item("database", database()));
+ builder.add(DisplayData.item("collection", collection()));
+ builder.add(DisplayData.item("batchSize", batchSize()));
}
private static class WriteFn extends DoFn<Document, Void> {
@@ -483,7 +657,10 @@ public class MongoDbIO {
@Setup
public void createMongoClient() throws Exception {
- client = new MongoClient(new MongoClientURI(spec.uri()));
+ MongoClientOptions.Builder builder = new MongoClientOptions.Builder();
+ builder.socketKeepAlive(spec.keepAlive());
+ builder.maxConnectionIdleTime(spec.maxConnectionIdleTime());
+ client = new MongoClient(new MongoClientURI(spec.uri(), builder));
}
@StartBundle
http://git-wip-us.apache.org/repos/asf/beam/blob/87be64e9/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java
index cd26b48..67dbca4 100644
--- a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java
+++ b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java
@@ -18,6 +18,7 @@
package org.apache.beam.sdk.io.mongodb;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import com.mongodb.MongoClient;
import com.mongodb.client.MongoCollection;
@@ -189,6 +190,42 @@ public class MongoDbIOTest implements Serializable {
}
@Test
+ public void testReadWithCustomConnectionOptions() throws Exception {
+ MongoDbIO.Read read = MongoDbIO.read()
+ .withUri("mongodb://localhost:" + port)
+ .withKeepAlive(false)
+ .withMaxConnectionIdleTime(10)
+ .withDatabase(DATABASE)
+ .withCollection(COLLECTION);
+ assertFalse(read.keepAlive());
+ assertEquals(10, read.maxConnectionIdleTime());
+
+ PCollection<Document> documents = pipeline.apply(read);
+
+ PAssert.thatSingleton(documents.apply("Count All", Count.<Document>globally()))
+ .isEqualTo(1000L);
+
+ PAssert.that(documents
+ .apply("Map Scientist", MapElements.via(new SimpleFunction<Document, KV<String, Void>>() {
+ public KV<String, Void> apply(Document input) {
+ return KV.of(input.getString("scientist"), null);
+ }
+ }))
+ .apply("Count Scientist", Count.<String, Void>perKey())
+ ).satisfies(new SerializableFunction<Iterable<KV<String, Long>>, Void>() {
+ @Override
+ public Void apply(Iterable<KV<String, Long>> input) {
+ for (KV<String, Long> element : input) {
+ assertEquals(100L, element.getValue().longValue());
+ }
+ return null;
+ }
+ });
+
+ pipeline.run();
+ }
+
+ @Test
public void testReadWithFilter() throws Exception {
PCollection<Document> output = pipeline.apply(
[2/2] beam git commit: [BEAM-975] This closes #3118
Posted by jb...@apache.org.
[BEAM-975] This closes #3118
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b400f4a6
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b400f4a6
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b400f4a6
Branch: refs/heads/master
Commit: b400f4a6f06e5d826d35045658c4a383cdd5953b
Parents: 1476f34 87be64e
Author: Jean-Baptiste Onofré <jb...@apache.org>
Authored: Mon Jun 19 21:24:08 2017 +0200
Committer: Jean-Baptiste Onofré <jb...@apache.org>
Committed: Mon Jun 19 21:24:08 2017 +0200
----------------------------------------------------------------------
.../apache/beam/sdk/io/mongodb/MongoDbIO.java | 315 +++++++++++++++----
.../beam/sdk/io/mongodb/MongoDbIOTest.java | 37 +++
2 files changed, 283 insertions(+), 69 deletions(-)
----------------------------------------------------------------------