You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2016/10/02 14:11:11 UTC
[1/2] incubator-beam git commit: [BEAM-698] Use AutoValue and deal
with Document instead of String in MongoDbIO
Repository: incubator-beam
Updated Branches:
refs/heads/master 2e0adaf02 -> 202acd1d6
[BEAM-698] Use AutoValue and deal with Document instead of String in MongoDbIO
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4d95423b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4d95423b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4d95423b
Branch: refs/heads/master
Commit: 4d95423bfe5ede1c48fea9489a18c17079691088
Parents: 2e0adaf
Author: Eugene Kirpichov <ki...@google.com>
Authored: Wed Sep 28 17:17:22 2016 -0700
Committer: Jean-Baptiste Onofr� <jb...@apache.org>
Committed: Sun Oct 2 15:49:49 2016 +0200
----------------------------------------------------------------------
sdks/java/io/mongodb/pom.xml | 6 +
.../apache/beam/sdk/io/mongodb/MongoDbIO.java | 276 ++++++++-----------
.../beam/sdk/io/mongodb/MongoDbIOTest.java | 19 +-
3 files changed, 132 insertions(+), 169 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d95423b/sdks/java/io/mongodb/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/mongodb/pom.xml b/sdks/java/io/mongodb/pom.xml
index b7e36af..5555173 100644
--- a/sdks/java/io/mongodb/pom.xml
+++ b/sdks/java/io/mongodb/pom.xml
@@ -94,6 +94,12 @@
<artifactId>joda-time</artifactId>
</dependency>
+ <dependency>
+ <groupId>com.google.auto.value</groupId>
+ <artifactId>auto-value</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
<!-- test dependencies -->
<dependency>
<groupId>de.flapdoodle.embed</groupId>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d95423b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
index a54694a..7c2bc28 100644
--- a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
+++ b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
@@ -17,9 +17,10 @@
*/
package org.apache.beam.sdk.io.mongodb;
+import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
-import com.google.common.annotations.VisibleForTesting;
+import com.google.auto.value.AutoValue;
import com.mongodb.BasicDBObject;
import com.mongodb.MongoClient;
import com.mongodb.MongoClientURI;
@@ -98,12 +99,12 @@ public class MongoDbIO {
/** Read data from MongoDB. */
public static Read read() {
- return new Read(new BoundedMongoDbSource(null, null, null, null, 0));
+ return new AutoValue_MongoDbIO_Read.Builder().setNumSplits(0).build();
}
/** Write data to MongoDB. */
public static Write write() {
- return new Write(new Write.MongoDbWriter(null, null, null, 1024L));
+ return new AutoValue_MongoDbIO_Write.Builder().setBatchSize(1024L).build();
}
private MongoDbIO() {
@@ -112,118 +113,97 @@ public class MongoDbIO {
/**
* A {@link PTransform} to read data from MongoDB.
*/
- public static class Read extends PTransform<PBegin, PCollection<String>> {
+ @AutoValue
+ public abstract static class Read extends PTransform<PBegin, PCollection<Document>> {
+ @Nullable abstract String uri();
+ @Nullable abstract String database();
+ @Nullable abstract String collection();
+ @Nullable abstract String filter();
+ abstract int numSplits();
+
+ abstract Builder toBuilder();
+
+ @AutoValue.Builder
+ abstract static class Builder {
+ abstract Builder setUri(String uri);
+ abstract Builder setDatabase(String database);
+ abstract Builder setCollection(String collection);
+ abstract Builder setFilter(String filter);
+ abstract Builder setNumSplits(int numSplits);
+ abstract Read build();
+ }
+ /**
+ * Example documentation for withUri.
+ */
public Read withUri(String uri) {
- return new Read(source.withUri(uri));
+ checkNotNull(uri);
+ return toBuilder().setUri(uri).build();
}
public Read withDatabase(String database) {
- return new Read(source.withDatabase(database));
+ checkNotNull(database);
+ return toBuilder().setDatabase(database).build();
}
public Read withCollection(String collection) {
- return new Read(source.withCollection(collection));
+ checkNotNull(collection);
+ return toBuilder().setCollection(collection).build();
}
public Read withFilter(String filter) {
- return new Read(source.withFilter(filter));
+ checkNotNull(filter);
+ return toBuilder().setFilter(filter).build();
}
public Read withNumSplits(int numSplits) {
- return new Read(source.withNumSplits(numSplits));
- }
-
- private final BoundedMongoDbSource source;
-
- private Read(BoundedMongoDbSource source) {
- this.source = source;
+ checkArgument(numSplits >= 0);
+ return toBuilder().setNumSplits(numSplits).build();
}
@Override
- public PCollection<String> apply(PBegin input) {
- return input.apply(org.apache.beam.sdk.io.Read.from(getSource()));
- }
-
- /**
- * Creates a {@link BoundedSource} with the configuration in {@link Read}.
- */
- @VisibleForTesting
- BoundedSource<String> getSource() {
- return source;
+ public PCollection<Document> apply(PBegin input) {
+ return input.apply(org.apache.beam.sdk.io.Read.from(new BoundedMongoDbSource(this)));
}
@Override
public void validate(PBegin input) {
- source.validate();
+ checkNotNull(uri(), "uri");
+ checkNotNull(database(), "database");
+ checkNotNull(collection(), "collection");
}
@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
-
- source.populateDisplayData(builder);
+ builder.add(DisplayData.item("uri", uri()));
+ builder.add(DisplayData.item("database", database()));
+ builder.add(DisplayData.item("collection", collection()));
+ builder.addIfNotNull(DisplayData.item("filter", filter()));
+ builder.add(DisplayData.item("numSplit", numSplits()));
}
-
}
- private static class BoundedMongoDbSource extends BoundedSource<String> {
-
- public BoundedMongoDbSource withUri(String uri) {
- return new BoundedMongoDbSource(uri, database, collection, filter, numSplits);
- }
-
- public BoundedMongoDbSource withDatabase(String database) {
- return new BoundedMongoDbSource(uri, database, collection, filter, numSplits);
- }
-
- public BoundedMongoDbSource withCollection(String collection) {
- return new BoundedMongoDbSource(uri, database, collection, filter, numSplits);
- }
+ private static class BoundedMongoDbSource extends BoundedSource<Document> {
+ private Read spec;
- public BoundedMongoDbSource withFilter(String filter) {
- return new BoundedMongoDbSource(uri, database, collection, filter, numSplits);
- }
-
- public BoundedMongoDbSource withNumSplits(int numSplits) {
- return new BoundedMongoDbSource(uri, database, collection, filter, numSplits);
- }
-
- private final String uri;
- private final String database;
- private final String collection;
- @Nullable
- private final String filter;
- private final int numSplits;
-
- public BoundedMongoDbSource(String uri, String database, String collection, String filter,
- int numSplits) {
- this.uri = uri;
- this.database = database;
- this.collection = collection;
- this.filter = filter;
- this.numSplits = numSplits;
+ private BoundedMongoDbSource(Read spec) {
+ this.spec = spec;
}
@Override
- public Coder getDefaultOutputCoder() {
- return SerializableCoder.of(String.class);
+ public Coder<Document> getDefaultOutputCoder() {
+ return SerializableCoder.of(Document.class);
}
@Override
public void validate() {
- checkNotNull(uri, "uri");
- checkNotNull(database, "database");
- checkNotNull(collection, "collection");
+ spec.validate(null);
}
@Override
public void populateDisplayData(DisplayData.Builder builder) {
- builder.add(DisplayData.item("uri", uri));
- builder.add(DisplayData.item("database", database));
- builder.add(DisplayData.item("collection", collection));
- builder.addIfNotNull(DisplayData.item("filter", filter));
- builder.add(DisplayData.item("numSplit", numSplits));
+ spec.populateDisplayData(builder);
}
@Override
@@ -232,39 +212,35 @@ public class MongoDbIO {
}
@Override
- public BoundedReader createReader(PipelineOptions options) {
+ public BoundedReader<Document> createReader(PipelineOptions options) {
return new BoundedMongoDbReader(this);
}
@Override
public long getEstimatedSizeBytes(PipelineOptions pipelineOptions) {
- long estimatedSizeBytes = 0L;
-
MongoClient mongoClient = new MongoClient();
- MongoDatabase mongoDatabase = mongoClient.getDatabase(database);
- MongoCollection mongoCollection = mongoDatabase.getCollection(collection);
+ MongoDatabase mongoDatabase = mongoClient.getDatabase(spec.database());
// get the Mongo collStats object
// it gives the size for the entire collection
BasicDBObject stat = new BasicDBObject();
- stat.append("collStats", collection);
+ stat.append("collStats", spec.collection());
Document stats = mongoDatabase.runCommand(stat);
- estimatedSizeBytes = Long.valueOf(stats.get("size").toString());
- return estimatedSizeBytes;
+ return Long.valueOf(stats.get("size").toString());
}
@Override
- public List<BoundedSource<String>> splitIntoBundles(long desiredBundleSizeBytes,
+ public List<BoundedSource<Document>> splitIntoBundles(long desiredBundleSizeBytes,
PipelineOptions options) {
MongoClient mongoClient = new MongoClient();
- MongoDatabase mongoDatabase = mongoClient.getDatabase(database);
+ MongoDatabase mongoDatabase = mongoClient.getDatabase(spec.database());
- List<Document> splitKeys = null;
- if (numSplits > 0) {
+ List<Document> splitKeys;
+ if (spec.numSplits() > 0) {
// the user defines his desired number of splits
// calculate the batch size
long estimatedSizeBytes = getEstimatedSizeBytes(options);
- desiredBundleSizeBytes = estimatedSizeBytes / numSplits;
+ desiredBundleSizeBytes = estimatedSizeBytes / spec.numSplits();
}
// the desired batch size is small, using default chunk size of 1MB
@@ -275,7 +251,7 @@ public class MongoDbIO {
// now we have the batch size (provided by user or provided by the runner)
// we use Mongo splitVector command to get the split keys
BasicDBObject splitVectorCommand = new BasicDBObject();
- splitVectorCommand.append("splitVector", database + "." + collection);
+ splitVectorCommand.append("splitVector", spec.database() + "." + spec.collection());
splitVectorCommand.append("keyPattern", new BasicDBObject().append("_id", 1));
splitVectorCommand.append("force", false);
// maxChunkSize is the Mongo partition size in MB
@@ -284,7 +260,7 @@ public class MongoDbIO {
Document splitVectorCommandResult = mongoDatabase.runCommand(splitVectorCommand);
splitKeys = (List<Document>) splitVectorCommandResult.get("splitKeys");
- List<BoundedSource<String>> sources = new ArrayList<>();
+ List<BoundedSource<Document>> sources = new ArrayList<>();
if (splitKeys.size() < 1) {
LOGGER.debug("Split keys is low, using an unique source");
sources.add(this);
@@ -292,8 +268,8 @@ public class MongoDbIO {
}
LOGGER.debug("Number of splits is {}", splitKeys.size());
- for (String shardFilter : splitKeysToFilters(splitKeys, filter)) {
- sources.add(this.withFilter(shardFilter));
+ for (String shardFilter : splitKeysToFilters(splitKeys, spec.filter())) {
+ sources.add(new BoundedMongoDbSource(spec.withFilter(shardFilter)));
}
return sources;
@@ -362,13 +338,12 @@ public class MongoDbIO {
}
}
- private static class BoundedMongoDbReader extends BoundedSource.BoundedReader<String> {
-
+ private static class BoundedMongoDbReader extends BoundedSource.BoundedReader<Document> {
private final BoundedMongoDbSource source;
private MongoClient client;
private MongoCursor<Document> cursor;
- private String current;
+ private Document current;
public BoundedMongoDbReader(BoundedMongoDbSource source) {
this.source = source;
@@ -376,16 +351,17 @@ public class MongoDbIO {
@Override
public boolean start() {
- client = new MongoClient(new MongoClientURI(source.uri));
+ Read spec = source.spec;
+ client = new MongoClient(new MongoClientURI(spec.uri()));
- MongoDatabase mongoDatabase = client.getDatabase(source.database);
+ MongoDatabase mongoDatabase = client.getDatabase(spec.database());
- MongoCollection<Document> mongoCollection = mongoDatabase.getCollection(source.collection);
+ MongoCollection<Document> mongoCollection = mongoDatabase.getCollection(spec.collection());
- if (source.filter == null) {
+ if (spec.filter() == null) {
cursor = mongoCollection.find().iterator();
} else {
- Document bson = Document.parse(source.filter);
+ Document bson = Document.parse(spec.filter());
cursor = mongoCollection.find(bson).iterator();
}
@@ -395,7 +371,7 @@ public class MongoDbIO {
@Override
public boolean advance() {
if (cursor.hasNext()) {
- current = cursor.next().toJson();
+ current = cursor.next();
return true;
} else {
return false;
@@ -403,12 +379,12 @@ public class MongoDbIO {
}
@Override
- public BoundedSource getCurrentSource() {
+ public BoundedMongoDbSource getCurrentSource() {
return source;
}
@Override
- public String getCurrent() {
+ public Document getCurrent() {
return current;
}
@@ -433,84 +409,66 @@ public class MongoDbIO {
/**
* A {@link PTransform} to write to a MongoDB database.
*/
- public static class Write extends PTransform<PCollection<String>, PDone> {
+ @AutoValue
+ public abstract static class Write extends PTransform<PCollection<Document>, PDone> {
+ @Nullable abstract String uri();
+ @Nullable abstract String database();
+ @Nullable abstract String collection();
+ abstract long batchSize();
+
+ abstract Builder toBuilder();
+
+ @AutoValue.Builder
+ abstract static class Builder {
+ abstract Builder setUri(String uri);
+ abstract Builder setDatabase(String database);
+ abstract Builder setCollection(String collection);
+ abstract Builder setBatchSize(long batchSize);
+ abstract Write build();
+ }
public Write withUri(String uri) {
- return new Write(writer.withUri(uri));
+ return toBuilder().setUri(uri).build();
}
public Write withDatabase(String database) {
- return new Write(writer.withDatabase(database));
+ return toBuilder().setDatabase(database).build();
}
public Write withCollection(String collection) {
- return new Write(writer.withCollection(collection));
+ return toBuilder().setCollection(collection).build();
}
public Write withBatchSize(long batchSize) {
- return new Write(writer.withBatchSize(batchSize));
- }
-
- private final MongoDbWriter writer;
-
- private Write(MongoDbWriter writer) {
- this.writer = writer;
+ return toBuilder().setBatchSize(batchSize).build();
}
@Override
- public PDone apply(PCollection<String> input) {
- input.apply(ParDo.of(writer));
+ public PDone apply(PCollection<Document> input) {
+ input.apply(ParDo.of(new WriteFn(this)));
return PDone.in(input.getPipeline());
}
@Override
- public void validate(PCollection<String> input) {
- writer.validate();
+ public void validate(PCollection<Document> input) {
+ checkNotNull(uri(), "uri");
+ checkNotNull(database(), "database");
+ checkNotNull(collection(), "collection");
+ checkNotNull(batchSize(), "batchSize");
}
- private static class MongoDbWriter extends DoFn<String, Void> {
-
- private final String uri;
- private final String database;
- private final String collection;
- private final long batchSize;
-
+ private static class WriteFn extends DoFn<Document, Void> {
+ private final Write spec;
private MongoClient client;
private List<Document> batch;
- public MongoDbWriter(String uri, String database, String collection, long batchSize) {
- this.uri = uri;
- this.database = database;
- this.collection = collection;
- this.batchSize = batchSize;
- }
-
- public MongoDbWriter withUri(String uri) {
- return new MongoDbWriter(uri, database, collection, batchSize);
- }
-
- public MongoDbWriter withDatabase(String database) {
- return new MongoDbWriter(uri, database, collection, batchSize);
- }
-
- public MongoDbWriter withCollection(String collection) {
- return new MongoDbWriter(uri, database, collection, batchSize);
- }
-
- public MongoDbWriter withBatchSize(long batchSize) {
- return new MongoDbWriter(uri, database, collection, batchSize);
- }
-
- public void validate() {
- checkNotNull(uri, "uri");
- checkNotNull(database, "database");
- checkNotNull(collection, "collection");
- checkNotNull(batchSize, "batchSize");
+ public WriteFn(Write spec) {
+ this.spec = spec;
}
@Setup
public void createMongoClient() throws Exception {
- client = new MongoClient(new MongoClientURI(uri));
+ client = new MongoClient(new MongoClientURI(spec.uri()));
}
@StartBundle
@@ -520,18 +478,18 @@ public class MongoDbIO {
@ProcessElement
public void processElement(ProcessContext ctx) throws Exception {
- String value = ctx.element();
-
- batch.add(Document.parse(ctx.element()));
- if (batch.size() >= batchSize) {
+ // Need to copy the document because mongoCollection.insertMany() will mutate it
+ // before inserting (will assign an id).
+ batch.add(new Document(ctx.element()));
+ if (batch.size() >= spec.batchSize()) {
finishBundle(ctx);
}
}
@FinishBundle
public void finishBundle(Context ctx) throws Exception {
- MongoDatabase mongoDatabase = client.getDatabase(database);
- MongoCollection mongoCollection = mongoDatabase.getCollection(collection);
+ MongoDatabase mongoDatabase = client.getDatabase(spec.database());
+ MongoCollection<Document> mongoCollection = mongoDatabase.getCollection(spec.collection());
mongoCollection.insertMany(batch);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d95423b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java
index 308e071..9a88267 100644
--- a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java
+++ b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java
@@ -127,20 +127,19 @@ public class MongoDbIOTest implements Serializable {
public void testFullRead() throws Exception {
TestPipeline pipeline = TestPipeline.create();
- PCollection<String> output = pipeline.apply(
+ PCollection<Document> output = pipeline.apply(
MongoDbIO.read()
.withUri("mongodb://localhost:" + PORT)
.withDatabase(DATABASE)
.withCollection(COLLECTION));
- PAssert.thatSingleton(output.apply("Count All", Count.<String>globally()))
+ PAssert.thatSingleton(output.apply("Count All", Count.<Document>globally()))
.isEqualTo(1000L);
PAssert.that(output
- .apply("Map Scientist", MapElements.via(new SimpleFunction<String, KV<String, Void>>() {
- public KV<String, Void> apply(String input) {
- Document bson = Document.parse(input);
- return KV.of(bson.getString("scientist"), null);
+ .apply("Map Scientist", MapElements.via(new SimpleFunction<Document, KV<String, Void>>() {
+ public KV<String, Void> apply(Document input) {
+ return KV.of(input.getString("scientist"), null);
}
}))
.apply("Count Scientist", Count.<String, Void>perKey())
@@ -162,14 +161,14 @@ public class MongoDbIOTest implements Serializable {
public void testReadWithFilter() throws Exception {
TestPipeline pipeline = TestPipeline.create();
- PCollection<String> output = pipeline.apply(
+ PCollection<Document> output = pipeline.apply(
MongoDbIO.read()
.withUri("mongodb://localhost:" + PORT)
.withDatabase(DATABASE)
.withCollection(COLLECTION)
.withFilter("{\"scientist\":\"Einstein\"}"));
- PAssert.thatSingleton(output.apply("Count", Count.<String>globally()))
+ PAssert.thatSingleton(output.apply("Count", Count.<Document>globally()))
.isEqualTo(100L);
pipeline.run();
@@ -180,9 +179,9 @@ public class MongoDbIOTest implements Serializable {
public void testWrite() throws Exception {
TestPipeline pipeline = TestPipeline.create();
- ArrayList<String> data = new ArrayList<>();
+ ArrayList<Document> data = new ArrayList<>();
for (int i = 0; i < 10000; i++) {
- data.add(String.format("{\"scientist\":\"Test %s\"}", i));
+ data.add(Document.parse(String.format("{\"scientist\":\"Test %s\"}", i)));
}
pipeline.apply(Create.of(data))
.apply(MongoDbIO.write().withUri("mongodb://localhost:" + PORT).withDatabase("test")
[2/2] incubator-beam git commit: [BEAM-698] This closes #1033
Posted by jb...@apache.org.
[BEAM-698] This closes #1033
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/202acd1d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/202acd1d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/202acd1d
Branch: refs/heads/master
Commit: 202acd1d6196b12b0d189b947ef202e32581cbed
Parents: 2e0adaf 4d95423
Author: Jean-Baptiste Onofr� <jb...@apache.org>
Authored: Sun Oct 2 15:51:40 2016 +0200
Committer: Jean-Baptiste Onofr� <jb...@apache.org>
Committed: Sun Oct 2 15:51:40 2016 +0200
----------------------------------------------------------------------
sdks/java/io/mongodb/pom.xml | 6 +
.../apache/beam/sdk/io/mongodb/MongoDbIO.java | 276 ++++++++-----------
.../beam/sdk/io/mongodb/MongoDbIOTest.java | 19 +-
3 files changed, 132 insertions(+), 169 deletions(-)
----------------------------------------------------------------------