You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2016/10/06 12:18:39 UTC
[1/2] incubator-beam git commit: [BEAM-698] Use AutoValue in MongoDB
GridFS
Repository: incubator-beam
Updated Branches:
refs/heads/master f27354f77 -> 8130bc36f
[BEAM-698] Use AutoValue in MongoDB GridFS
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/26474c7e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/26474c7e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/26474c7e
Branch: refs/heads/master
Commit: 26474c7ee6bcd7a89b977ca045d9f557112a0113
Parents: f27354f
Author: Jean-Baptiste Onofr� <jb...@apache.org>
Authored: Mon Oct 3 08:02:07 2016 +0200
Committer: Jean-Baptiste Onofr� <jb...@apache.org>
Committed: Thu Oct 6 13:46:17 2016 +0200
----------------------------------------------------------------------
.../beam/sdk/io/mongodb/MongoDbGridFSIO.java | 197 ++++++++++---------
.../apache/beam/sdk/io/mongodb/MongoDbIO.java | 20 +-
.../sdk/io/mongodb/MongoDBGridFSIOTest.java | 13 +-
3 files changed, 117 insertions(+), 113 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/26474c7e/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java
index cebda64..bdf0e53 100644
--- a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java
+++ b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java
@@ -18,6 +18,8 @@
package org.apache.beam.sdk.io.mongodb;
import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.auto.value.AutoValue;
import com.mongodb.DB;
import com.mongodb.DBCursor;
import com.mongodb.DBObject;
@@ -53,23 +55,22 @@ import org.bson.types.ObjectId;
import org.joda.time.Duration;
import org.joda.time.Instant;
-
/**
- * IO to read and write data on MongoDB GridFS.
- * <p>
+ * IO to read and write data on MongoDB GridFS.
+ *
* <h3>Reading from MongoDB via GridFS</h3>
- * <p>
+ *
* <p>MongoDbGridFSIO source returns a bounded collection of Objects as {@code PCollection<T>}.
- * <p>
+ *
* <p>To configure the MongoDB GridFS source, you can provide the connection URI, the database name
- * and the bucket name. If unspecified, the default values from the GridFS driver are used.
+ * and the bucket name. If unspecified, the default values from the GridFS driver are used.</p>
*
- * The following example illustrates various options for configuring the
+ * <p>The following example illustrates various options for configuring the
* source:</p>
- * <p>
+ *
* <pre>{@code
*
- * pipeline.apply(MongoDbGridFSIO.read()
+ * pipeline.apply(MongoDbGridFSIO.<String>read()
* .withUri("mongodb://localhost:27017")
* .withDatabase("my-database")
* .withBucket("my-bucket"))
@@ -79,11 +80,12 @@ import org.joda.time.Instant;
* <p>The source also accepts an optional configuration: {@code withQueryFilter()} allows you to
* define a JSON filter to get subset of files in the database.</p>
*
- * <p>There is also an optional {@code Parser} that can be specified that can be used to
- * parse the InputStream into objects usable with Beam. By default, MongoDbGridFSIO will parse
- * into Strings, splitting on line breaks and using the uploadDate of the file as the timestamp.
+ * <p>There is also an optional {@code Parser} (and associated {@code Coder}) that can be
+ * specified that can be used to parse the InputStream into objects usable with Beam. By default,
+ * MongoDbGridFSIO will parse into Strings, splitting on line breaks and using the uploadDate of
+ * the file as the timestamp.
* When using a parser that outputs with custom timestamps, you may also need to specify
- * the allowedTimestampSkew option.
+ * the allowedTimestampSkew option.</p>
*/
public class MongoDbGridFSIO {
@@ -96,14 +98,14 @@ public class MongoDbGridFSIO {
* creation timestamp.
* @param output
*/
- public void output(T output);
+ void output(T output);
/**
* Output the object using the specified timestamp.
* @param output
* @param timestamp
*/
- public void output(T output, Instant timestamp);
+ void output(T output, Instant timestamp);
}
/**
@@ -112,7 +114,7 @@ public class MongoDbGridFSIO {
* @param <T>
*/
public interface Parser<T> extends Serializable {
- public void parse(GridFSDBFile input, ParserCallback<T> callback) throws IOException;
+ void parse(GridFSDBFile input, ParserCallback<T> callback) throws IOException;
}
/**
@@ -134,71 +136,91 @@ public class MongoDbGridFSIO {
}
};
- /** Read data from GridFS. */
+ /** Read data from GridFS. Default behavior with String. */
public static Read<String> read() {
- return new Read<String>(new Read.BoundedGridFSSource(null, null, null, null,
- null), TEXT_PARSER, StringUtf8Coder.of(), Duration.ZERO);
+ return new AutoValue_MongoDbGridFSIO_Read.Builder<String>().build()
+ .withParser(TEXT_PARSER).withCoder(StringUtf8Coder.of());
}
- static class Read<T> extends PTransform<PBegin, PCollection<T>> {
+ /**
+ * A {@link PTransform} to read data from MongoDB GridFS.
+ */
+ @AutoValue
+ public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>> {
+
+ @Nullable abstract String uri();
+ @Nullable abstract String database();
+ @Nullable abstract String bucket();
+ @Nullable abstract Parser<T> parser();
+ @Nullable abstract Coder<T> coder();
+ @Nullable abstract Duration skew();
+ @Nullable abstract String filter();
+
+ abstract Builder<T> toBuilder();
+
+ @AutoValue.Builder
+ abstract static class Builder<T> {
+ abstract Builder<T> setUri(String uri);
+ abstract Builder<T> setDatabase(String database);
+ abstract Builder<T> setBucket(String bucket);
+ abstract Builder<T> setParser(Parser<T> parser);
+ abstract Builder<T> setCoder(Coder<T> coder);
+ abstract Builder<T> setSkew(Duration skew);
+ abstract Builder<T> setFilter(String filter);
+ abstract Read<T> build();
+ }
+
public Read<T> withUri(String uri) {
- return new Read<T>(new BoundedGridFSSource(uri, options.database,
- options.bucket, options.filterJson,
- null), parser, coder, allowedTimestampSkew);
+ checkNotNull(uri);
+ return toBuilder().setUri(uri).build();
}
public Read<T> withDatabase(String database) {
- return new Read<T>(new BoundedGridFSSource(options.uri, database,
- options.bucket, options.filterJson,
- null), parser, coder, allowedTimestampSkew);
+ checkNotNull(database);
+ return toBuilder().setDatabase(database).build();
}
public Read<T> withBucket(String bucket) {
- return new Read<T>(new BoundedGridFSSource(options.uri, options.database, bucket,
- options.filterJson, null), parser, coder, allowedTimestampSkew);
+ checkNotNull(bucket);
+ return toBuilder().setBucket(bucket).build();
}
- public <X> Read<X> withParser(Parser<X> f) {
- return withParser(f, null);
- }
- public <X> Read<X> withParser(Parser<X> f, Coder<X> coder) {
- checkNotNull(f, "Parser cannot be null");
- //coder can be null as it can be set on the output directly
- return new Read<X>(new BoundedGridFSSource(options.uri, options.database,
- options.bucket, options.filterJson, null), f, coder, allowedTimestampSkew);
+ public <X> Read<X> withParser(Parser<X> parser) {
+ checkNotNull(parser);
+ Builder<X> builder = (Builder<X>) toBuilder();
+ return builder.setParser(parser).setCoder(null).build();
}
- public Read<T> allowedTimestampSkew(Duration skew) {
- return new Read<T>(new BoundedGridFSSource(options.uri, options.database,
- options.bucket, options.filterJson, null),
- parser, coder, skew == null ? Duration.ZERO : skew);
+
+ public Read<T> withCoder(Coder<T> coder) {
+ checkNotNull(coder);
+ return toBuilder().setCoder(coder).build();
}
- public Read<T> withQueryFilter(String filterJson) {
- return new Read<T>(new BoundedGridFSSource(options.uri, options.database,
- options.bucket, filterJson, null), parser, coder, allowedTimestampSkew);
+ public Read<T> withSkew(Duration skew) {
+ return toBuilder().setSkew(skew == null ? Duration.ZERO : skew).build();
}
- private final BoundedGridFSSource options;
- private final Parser<T> parser;
- private final Coder<T> coder;
- private final Duration allowedTimestampSkew;
-
- Read(BoundedGridFSSource options, Parser<T> parser,
- Coder<T> coder, Duration allowedTimestampSkew) {
- this.options = options;
- this.parser = parser;
- this.allowedTimestampSkew = allowedTimestampSkew;
- this.coder = coder;
+ public Read<T> withFilter(String filter) {
+ return toBuilder().setFilter(filter).build();
}
- BoundedGridFSSource getSource() {
- return options;
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
+ builder.addIfNotNull(DisplayData.item("uri", uri()));
+ builder.addIfNotNull(DisplayData.item("database", database()));
+ builder.addIfNotNull(DisplayData.item("bucket", bucket()));
+ builder.addIfNotNull(DisplayData.item("parser", parser().getClass().getName()));
+ builder.addIfNotNull(DisplayData.item("coder", coder().getClass().getName()));
+ builder.addIfNotNull(DisplayData.item("skew", skew()));
+ builder.addIfNotNull(DisplayData.item("filter", filter()));
}
@Override
public PCollection<T> apply(PBegin input) {
+ final BoundedGridFSSource source = new BoundedGridFSSource(this, null);
org.apache.beam.sdk.io.Read.Bounded<ObjectId> objectIds =
- org.apache.beam.sdk.io.Read.from(options);
+ org.apache.beam.sdk.io.Read.from(source);
PCollection<T> output = input.getPipeline().apply(objectIds)
.apply(ParDo.of(new DoFn<ObjectId, T>() {
Mongo mongo;
@@ -206,8 +228,8 @@ public class MongoDbGridFSIO {
@Setup
public void setup() {
- mongo = options.setupMongo();
- gridfs = options.setupGridFS(mongo);
+ mongo = source.setupMongo();
+ gridfs = source.setupGridFS(mongo);
}
@Teardown
@@ -219,7 +241,7 @@ public class MongoDbGridFSIO {
public void processElement(final ProcessContext c) throws IOException {
ObjectId oid = c.element();
GridFSDBFile file = gridfs.find(oid);
- parser.parse(file, new ParserCallback<T>() {
+ parser().parse(file, new ParserCallback<T>() {
@Override
public void output(T output, Instant timestamp) {
checkNotNull(timestamp);
@@ -235,49 +257,42 @@ public class MongoDbGridFSIO {
@Override
public Duration getAllowedTimestampSkew() {
- return allowedTimestampSkew;
+ return skew();
}
}));
- if (coder != null) {
- output.setCoder(coder);
+ if (coder() != null) {
+ output.setCoder(coder());
}
return output;
}
- static class BoundedGridFSSource extends BoundedSource<ObjectId> {
- @Nullable
- private final String uri;
- @Nullable
- private final String database;
- @Nullable
- private final String bucket;
- @Nullable
- private final String filterJson;
+ /**
+ * A {@link BoundedSource} for MongoDB GridFS.
+ */
+ protected static class BoundedGridFSSource extends BoundedSource<ObjectId> {
+
+ private Read spec;
+
@Nullable
private List<ObjectId> objectIds;
- BoundedGridFSSource(String uri, String database,
- String bucket, String filterJson,
- List<ObjectId> objectIds) {
- this.uri = uri;
- this.database = database;
- this.bucket = bucket;
+ BoundedGridFSSource(Read spec, List<ObjectId> objectIds) {
+ this.spec = spec;
this.objectIds = objectIds;
- this.filterJson = filterJson;
}
private Mongo setupMongo() {
- return uri == null ? new Mongo() : new Mongo(new MongoURI(uri));
+ return spec.uri() == null ? new Mongo() : new Mongo(new MongoURI(spec.uri()));
}
private GridFS setupGridFS(Mongo mongo) {
- DB db = database == null ? mongo.getDB("gridfs") : mongo.getDB(database);
- return bucket == null ? new GridFS(db) : new GridFS(db, bucket);
+ DB db = spec.database() == null ? mongo.getDB("gridfs") : mongo.getDB(spec.database());
+ return spec.bucket() == null ? new GridFS(db) : new GridFS(db, spec.bucket());
}
private DBCursor createCursor(GridFS gridfs) {
- if (filterJson != null) {
- DBObject query = (DBObject) JSON.parse(filterJson);
+ if (spec.filter() != null) {
+ DBObject query = (DBObject) JSON.parse(spec.filter());
return gridfs.getFileList(query).sort(null);
}
return gridfs.getFileList().sort(null);
@@ -297,9 +312,7 @@ public class MongoDbGridFSIO {
GridFSDBFile file = (GridFSDBFile) cursor.next();
long len = file.getLength();
if ((size + len) > desiredBundleSizeBytes && !objects.isEmpty()) {
- list.add(new BoundedGridFSSource(uri, database, bucket,
- filterJson,
- objects));
+ list.add(new BoundedGridFSSource(spec, objects));
size = 0;
objects = new ArrayList<>();
}
@@ -307,9 +320,7 @@ public class MongoDbGridFSIO {
size += len;
}
if (!objects.isEmpty() || list.isEmpty()) {
- list.add(new BoundedGridFSSource(uri, database, bucket,
- filterJson,
- objects));
+ list.add(new BoundedGridFSSource(spec, objects));
}
return list;
} finally {
@@ -351,11 +362,7 @@ public class MongoDbGridFSIO {
@Override
public void populateDisplayData(DisplayData.Builder builder) {
- super.populateDisplayData(builder);
- builder.addIfNotNull(DisplayData.item("uri", uri));
- builder.addIfNotNull(DisplayData.item("database", database));
- builder.addIfNotNull(DisplayData.item("bucket", bucket));
- builder.addIfNotNull(DisplayData.item("filterJson", filterJson));
+ spec.populateDisplayData(builder);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/26474c7e/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
index 7c2bc28..d5659e9 100644
--- a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
+++ b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
@@ -47,16 +47,16 @@ import org.slf4j.LoggerFactory;
/**
* IO to read and write data on MongoDB.
- * <p>
+ *
* <h3>Reading from MongoDB</h3>
- * <p>
+ *
* <p>MongoDbIO source returns a bounded collection of String as {@code PCollection<String>}.
* The String is the JSON form of the MongoDB Document.</p>
- * <p>
+ *
* <p>To configure the MongoDB source, you have to provide the connection URI, the database name
* and the collection name. The following example illustrates various options for configuring the
* source:</p>
- * <p>
+ *
* <pre>{@code
*
* pipeline.apply(MongoDbIO.read()
@@ -68,17 +68,17 @@ import org.slf4j.LoggerFactory;
* // rest of the settings are optional
*
* }</pre>
- * <p>
+ *
* <p>The source also accepts an optional configuration: {@code withFilter()} allows you to
* define a JSON filter to get subset of data.</p>
- * <p>
+ *
* <h3>Writing to MongoDB</h3>
- * <p>
+ *
* <p>MongoDB sink supports writing of Document (as JSON String) in a MongoDB.</p>
- * <p>
+ *
* <p>To configure a MongoDB sink, you must specify a connection {@code URI}, a {@code Database}
* name, a {@code Collection} name. For instance:</p>
- * <p>
+ *
* <pre>{@code
*
* pipeline
@@ -91,8 +91,6 @@ import org.slf4j.LoggerFactory;
*
* }</pre>
*/
-// TODO instead of JSON String, does it make sense to populate the PCollection with BSON Document or
-// DBObject ??
public class MongoDbIO {
private static final Logger LOGGER = LoggerFactory.getLogger(MongoDbIO.class);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/26474c7e/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java
index e5e5c8f..5f4d122 100644
--- a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java
+++ b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java
@@ -75,7 +75,6 @@ import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
/**
* Test on the MongoDbGridFSIO.
*/
@@ -169,7 +168,7 @@ public class MongoDBGridFSIOTest implements Serializable {
TestPipeline pipeline = TestPipeline.create();
PCollection<String> output = pipeline.apply(
- MongoDbGridFSIO.read()
+ MongoDbGridFSIO.<String>read()
.withUri("mongodb://localhost:" + PORT)
.withDatabase(DATABASE));
@@ -199,7 +198,7 @@ public class MongoDBGridFSIOTest implements Serializable {
TestPipeline pipeline = TestPipeline.create();
PCollection<KV<String, Integer>> output = pipeline.apply(
- MongoDbGridFSIO.read()
+ MongoDbGridFSIO.<KV<String, Integer>>read()
.withUri("mongodb://localhost:" + PORT)
.withDatabase(DATABASE)
.withBucket("mapBucket")
@@ -223,8 +222,8 @@ public class MongoDBGridFSIOTest implements Serializable {
}
}
})
- .allowedTimestampSkew(new Duration(3601000L)))
- .setCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()));
+ .withSkew(new Duration(3601000L))
+ .withCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of())));
PAssert.thatSingleton(output.apply("Count All", Count.<KV<String, Integer>>globally()))
.isEqualTo(50100L);
@@ -246,11 +245,11 @@ public class MongoDBGridFSIOTest implements Serializable {
@Test
public void testSplit() throws Exception {
PipelineOptions options = PipelineOptionsFactory.create();
- MongoDbGridFSIO.Read<String> read = MongoDbGridFSIO.read()
+ MongoDbGridFSIO.Read<String> read = MongoDbGridFSIO.<String>read()
.withUri("mongodb://localhost:" + PORT)
.withDatabase(DATABASE);
- BoundedGridFSSource src = read.getSource();
+ BoundedGridFSSource src = new BoundedGridFSSource(read, null);
// make sure 2 files can fit in
long desiredBundleSizeBytes = (src.getEstimatedSizeBytes(options) * 2L) / 5L + 1000;
[2/2] incubator-beam git commit: [BEAM-698] This closes #1054
Posted by jb...@apache.org.
[BEAM-698] This closes #1054
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8130bc36
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8130bc36
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8130bc36
Branch: refs/heads/master
Commit: 8130bc36feca77737a4e171e14307f53410201c7
Parents: f27354f 26474c7
Author: Jean-Baptiste Onofr� <jb...@apache.org>
Authored: Thu Oct 6 14:02:34 2016 +0200
Committer: Jean-Baptiste Onofr� <jb...@apache.org>
Committed: Thu Oct 6 14:02:34 2016 +0200
----------------------------------------------------------------------
.../beam/sdk/io/mongodb/MongoDbGridFSIO.java | 197 ++++++++++---------
.../apache/beam/sdk/io/mongodb/MongoDbIO.java | 20 +-
.../sdk/io/mongodb/MongoDBGridFSIOTest.java | 13 +-
3 files changed, 117 insertions(+), 113 deletions(-)
----------------------------------------------------------------------