You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2016/10/06 12:18:39 UTC

[1/2] incubator-beam git commit: [BEAM-698] Use AutoValue in MongoDB GridFS

Repository: incubator-beam
Updated Branches:
  refs/heads/master f27354f77 -> 8130bc36f


[BEAM-698] Use AutoValue in MongoDB GridFS


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/26474c7e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/26474c7e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/26474c7e

Branch: refs/heads/master
Commit: 26474c7ee6bcd7a89b977ca045d9f557112a0113
Parents: f27354f
Author: Jean-Baptiste Onofr� <jb...@apache.org>
Authored: Mon Oct 3 08:02:07 2016 +0200
Committer: Jean-Baptiste Onofr� <jb...@apache.org>
Committed: Thu Oct 6 13:46:17 2016 +0200

----------------------------------------------------------------------
 .../beam/sdk/io/mongodb/MongoDbGridFSIO.java    | 197 ++++++++++---------
 .../apache/beam/sdk/io/mongodb/MongoDbIO.java   |  20 +-
 .../sdk/io/mongodb/MongoDBGridFSIOTest.java     |  13 +-
 3 files changed, 117 insertions(+), 113 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/26474c7e/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java
index cebda64..bdf0e53 100644
--- a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java
+++ b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java
@@ -18,6 +18,8 @@
 package org.apache.beam.sdk.io.mongodb;
 
 import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.auto.value.AutoValue;
 import com.mongodb.DB;
 import com.mongodb.DBCursor;
 import com.mongodb.DBObject;
@@ -53,23 +55,22 @@ import org.bson.types.ObjectId;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 
-
 /**
-  * IO to read and write data on MongoDB GridFS.
- * <p>
+ * IO to read and write data on MongoDB GridFS.
+ *
  * <h3>Reading from MongoDB via GridFS</h3>
- * <p>
+ *
  * <p>MongoDbGridFSIO source returns a bounded collection of Objects as {@code PCollection<T>}.
- * <p>
+ *
  * <p>To configure the MongoDB GridFS source, you can provide the connection URI, the database name
- * and the bucket name.  If unspecified, the default values from the GridFS driver are used.
+ * and the bucket name.  If unspecified, the default values from the GridFS driver are used.</p>
  *
- * The following example illustrates various options for configuring the
+ * <p>The following example illustrates various options for configuring the
  * source:</p>
- * <p>
+ *
  * <pre>{@code
  *
- * pipeline.apply(MongoDbGridFSIO.read()
+ * pipeline.apply(MongoDbGridFSIO.<String>read()
  *   .withUri("mongodb://localhost:27017")
  *   .withDatabase("my-database")
  *   .withBucket("my-bucket"))
@@ -79,11 +80,12 @@ import org.joda.time.Instant;
  * <p>The source also accepts an optional configuration: {@code withQueryFilter()} allows you to
  * define a JSON filter to get subset of files in the database.</p>
  *
- * <p>There is also an optional {@code Parser} that can be specified that can be used to
- * parse the InputStream into objects usable with Beam.  By default, MongoDbGridFSIO will parse
- * into Strings, splitting on line breaks and using the uploadDate of the file as the timestamp.
+ * <p>There is also an optional {@code Parser} (and associated {@code Coder}) that can be
+ * specified that can be used to parse the InputStream into objects usable with Beam. By default,
+ * MongoDbGridFSIO will parse into Strings, splitting on line breaks and using the uploadDate of
+ * the file as the timestamp.
  * When using a parser that outputs with custom timestamps, you may also need to specify
- * the allowedTimestampSkew option.
+ * the allowedTimestampSkew option.</p>
  */
 public class MongoDbGridFSIO {
 
@@ -96,14 +98,14 @@ public class MongoDbGridFSIO {
      * creation timestamp.
      * @param output
      */
-    public void output(T output);
+    void output(T output);
 
     /**
      * Output the object using the specified timestamp.
      * @param output
      * @param timestamp
      */
-    public void output(T output, Instant timestamp);
+    void output(T output, Instant timestamp);
   }
 
   /**
@@ -112,7 +114,7 @@ public class MongoDbGridFSIO {
    * @param <T>
    */
   public interface Parser<T> extends Serializable {
-    public void parse(GridFSDBFile input, ParserCallback<T> callback) throws IOException;
+    void parse(GridFSDBFile input, ParserCallback<T> callback) throws IOException;
   }
 
   /**
@@ -134,71 +136,91 @@ public class MongoDbGridFSIO {
     }
   };
 
-  /** Read data from GridFS. */
+  /** Read data from GridFS. Default behavior with String. */
   public static Read<String> read() {
-    return new Read<String>(new Read.BoundedGridFSSource(null, null, null, null,
-                            null), TEXT_PARSER, StringUtf8Coder.of(), Duration.ZERO);
+    return new AutoValue_MongoDbGridFSIO_Read.Builder<String>().build()
+        .withParser(TEXT_PARSER).withCoder(StringUtf8Coder.of());
   }
 
-  static class Read<T> extends PTransform<PBegin, PCollection<T>> {
+  /**
+   * A {@link PTransform} to read data from MongoDB GridFS.
+   */
+  @AutoValue
+  public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>> {
+
+    @Nullable abstract String uri();
+    @Nullable abstract String database();
+    @Nullable abstract String bucket();
+    @Nullable abstract Parser<T> parser();
+    @Nullable abstract Coder<T> coder();
+    @Nullable abstract Duration skew();
+    @Nullable abstract String filter();
+
+    abstract Builder<T> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> setUri(String uri);
+      abstract Builder<T> setDatabase(String database);
+      abstract Builder<T> setBucket(String bucket);
+      abstract Builder<T> setParser(Parser<T> parser);
+      abstract Builder<T> setCoder(Coder<T> coder);
+      abstract Builder<T> setSkew(Duration skew);
+      abstract Builder<T> setFilter(String filter);
+      abstract Read<T> build();
+    }
+
     public Read<T> withUri(String uri) {
-      return new Read<T>(new BoundedGridFSSource(uri, options.database,
-                                           options.bucket, options.filterJson,
-                                           null), parser, coder, allowedTimestampSkew);
+      checkNotNull(uri);
+      return toBuilder().setUri(uri).build();
     }
 
     public Read<T> withDatabase(String database) {
-      return new Read<T>(new BoundedGridFSSource(options.uri, database,
-                                           options.bucket, options.filterJson,
-                                           null), parser, coder, allowedTimestampSkew);
+      checkNotNull(database);
+      return toBuilder().setDatabase(database).build();
     }
 
     public Read<T> withBucket(String bucket) {
-      return new Read<T>(new BoundedGridFSSource(options.uri, options.database, bucket,
-          options.filterJson, null), parser, coder, allowedTimestampSkew);
+      checkNotNull(bucket);
+      return toBuilder().setBucket(bucket).build();
     }
 
-    public <X> Read<X> withParser(Parser<X> f) {
-      return withParser(f, null);
-    }
-    public <X> Read<X> withParser(Parser<X> f, Coder<X> coder) {
-      checkNotNull(f, "Parser cannot be null");
-      //coder can be null as it can be set on the output directly
-      return new Read<X>(new BoundedGridFSSource(options.uri, options.database,
-          options.bucket, options.filterJson, null), f, coder, allowedTimestampSkew);
+    public <X> Read<X> withParser(Parser<X> parser) {
+      checkNotNull(parser);
+      Builder<X> builder = (Builder<X>) toBuilder();
+      return builder.setParser(parser).setCoder(null).build();
     }
-    public Read<T> allowedTimestampSkew(Duration skew) {
-      return new Read<T>(new BoundedGridFSSource(options.uri, options.database,
-          options.bucket, options.filterJson, null),
-          parser, coder, skew == null ? Duration.ZERO : skew);
+
+    public Read<T> withCoder(Coder<T> coder) {
+      checkNotNull(coder);
+      return toBuilder().setCoder(coder).build();
     }
 
-    public Read<T> withQueryFilter(String filterJson) {
-      return new Read<T>(new BoundedGridFSSource(options.uri, options.database,
-          options.bucket, filterJson, null), parser, coder, allowedTimestampSkew);
+    public Read<T> withSkew(Duration skew) {
+      return toBuilder().setSkew(skew == null ? Duration.ZERO : skew).build();
     }
 
-    private final BoundedGridFSSource options;
-    private final Parser<T> parser;
-    private final Coder<T> coder;
-    private final Duration allowedTimestampSkew;
-
-    Read(BoundedGridFSSource options, Parser<T> parser,
-        Coder<T> coder, Duration allowedTimestampSkew) {
-      this.options = options;
-      this.parser = parser;
-      this.allowedTimestampSkew = allowedTimestampSkew;
-      this.coder = coder;
+    public Read<T> withFilter(String filter) {
+      return toBuilder().setFilter(filter).build();
     }
 
-    BoundedGridFSSource getSource() {
-      return options;
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+      builder.addIfNotNull(DisplayData.item("uri", uri()));
+      builder.addIfNotNull(DisplayData.item("database", database()));
+      builder.addIfNotNull(DisplayData.item("bucket", bucket()));
+      builder.addIfNotNull(DisplayData.item("parser", parser().getClass().getName()));
+      builder.addIfNotNull(DisplayData.item("coder", coder().getClass().getName()));
+      builder.addIfNotNull(DisplayData.item("skew", skew()));
+      builder.addIfNotNull(DisplayData.item("filter", filter()));
     }
 
     @Override
     public PCollection<T> apply(PBegin input) {
+      final BoundedGridFSSource source = new BoundedGridFSSource(this, null);
       org.apache.beam.sdk.io.Read.Bounded<ObjectId> objectIds =
-          org.apache.beam.sdk.io.Read.from(options);
+          org.apache.beam.sdk.io.Read.from(source);
       PCollection<T> output = input.getPipeline().apply(objectIds)
           .apply(ParDo.of(new DoFn<ObjectId, T>() {
             Mongo mongo;
@@ -206,8 +228,8 @@ public class MongoDbGridFSIO {
 
             @Setup
             public void setup() {
-              mongo = options.setupMongo();
-              gridfs = options.setupGridFS(mongo);
+              mongo = source.setupMongo();
+              gridfs = source.setupGridFS(mongo);
             }
 
             @Teardown
@@ -219,7 +241,7 @@ public class MongoDbGridFSIO {
             public void processElement(final ProcessContext c) throws IOException {
               ObjectId oid = c.element();
               GridFSDBFile file = gridfs.find(oid);
-              parser.parse(file, new ParserCallback<T>() {
+              parser().parse(file, new ParserCallback<T>() {
                 @Override
                 public void output(T output, Instant timestamp) {
                   checkNotNull(timestamp);
@@ -235,49 +257,42 @@ public class MongoDbGridFSIO {
 
             @Override
             public Duration getAllowedTimestampSkew() {
-              return allowedTimestampSkew;
+              return skew();
             }
           }));
-      if (coder != null) {
-        output.setCoder(coder);
+      if (coder() != null) {
+        output.setCoder(coder());
       }
       return output;
     }
 
-    static class BoundedGridFSSource extends BoundedSource<ObjectId> {
-      @Nullable
-      private final String uri;
-      @Nullable
-      private final String database;
-      @Nullable
-      private final String bucket;
-      @Nullable
-      private final String filterJson;
+    /**
+     * A {@link BoundedSource} for MongoDB GridFS.
+     */
+    protected static class BoundedGridFSSource extends BoundedSource<ObjectId> {
+
+      private Read spec;
+
       @Nullable
       private List<ObjectId> objectIds;
 
-      BoundedGridFSSource(String uri, String database,
-                          String bucket, String filterJson,
-                          List<ObjectId> objectIds) {
-        this.uri = uri;
-        this.database = database;
-        this.bucket = bucket;
+      BoundedGridFSSource(Read spec, List<ObjectId> objectIds) {
+        this.spec = spec;
         this.objectIds = objectIds;
-        this.filterJson = filterJson;
       }
 
       private Mongo setupMongo() {
-        return uri == null ? new Mongo() : new Mongo(new MongoURI(uri));
+        return spec.uri() == null ? new Mongo() : new Mongo(new MongoURI(spec.uri()));
       }
 
       private GridFS setupGridFS(Mongo mongo) {
-        DB db = database == null ? mongo.getDB("gridfs") : mongo.getDB(database);
-        return bucket == null ? new GridFS(db) : new GridFS(db, bucket);
+        DB db = spec.database() == null ? mongo.getDB("gridfs") : mongo.getDB(spec.database());
+        return spec.bucket() == null ? new GridFS(db) : new GridFS(db, spec.bucket());
       }
 
       private DBCursor createCursor(GridFS gridfs) {
-        if (filterJson != null) {
-          DBObject query = (DBObject) JSON.parse(filterJson);
+        if (spec.filter() != null) {
+          DBObject query = (DBObject) JSON.parse(spec.filter());
           return gridfs.getFileList(query).sort(null);
         }
         return gridfs.getFileList().sort(null);
@@ -297,9 +312,7 @@ public class MongoDbGridFSIO {
             GridFSDBFile file = (GridFSDBFile) cursor.next();
             long len = file.getLength();
             if ((size + len) > desiredBundleSizeBytes && !objects.isEmpty()) {
-              list.add(new BoundedGridFSSource(uri, database, bucket,
-                                                 filterJson,
-                                                 objects));
+              list.add(new BoundedGridFSSource(spec, objects));
               size = 0;
               objects = new ArrayList<>();
             }
@@ -307,9 +320,7 @@ public class MongoDbGridFSIO {
             size += len;
           }
           if (!objects.isEmpty() || list.isEmpty()) {
-            list.add(new BoundedGridFSSource(uri, database, bucket,
-                                             filterJson,
-                                             objects));
+            list.add(new BoundedGridFSSource(spec, objects));
           }
           return list;
         } finally {
@@ -351,11 +362,7 @@ public class MongoDbGridFSIO {
 
       @Override
       public void populateDisplayData(DisplayData.Builder builder) {
-        super.populateDisplayData(builder);
-        builder.addIfNotNull(DisplayData.item("uri", uri));
-        builder.addIfNotNull(DisplayData.item("database", database));
-        builder.addIfNotNull(DisplayData.item("bucket", bucket));
-        builder.addIfNotNull(DisplayData.item("filterJson", filterJson));
+        spec.populateDisplayData(builder);
       }
 
       @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/26474c7e/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
index 7c2bc28..d5659e9 100644
--- a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
+++ b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
@@ -47,16 +47,16 @@ import org.slf4j.LoggerFactory;
 
 /**
  * IO to read and write data on MongoDB.
- * <p>
+ *
  * <h3>Reading from MongoDB</h3>
- * <p>
+ *
  * <p>MongoDbIO source returns a bounded collection of String as {@code PCollection<String>}.
  * The String is the JSON form of the MongoDB Document.</p>
- * <p>
+ *
  * <p>To configure the MongoDB source, you have to provide the connection URI, the database name
  * and the collection name. The following example illustrates various options for configuring the
  * source:</p>
- * <p>
+ *
  * <pre>{@code
  *
  * pipeline.apply(MongoDbIO.read()
@@ -68,17 +68,17 @@ import org.slf4j.LoggerFactory;
  *   // rest of the settings are optional
  *
  * }</pre>
- * <p>
+ *
  * <p>The source also accepts an optional configuration: {@code withFilter()} allows you to
  * define a JSON filter to get subset of data.</p>
- * <p>
+ *
  * <h3>Writing to MongoDB</h3>
- * <p>
+ *
  * <p>MongoDB sink supports writing of Document (as JSON String) in a MongoDB.</p>
- * <p>
+ *
  * <p>To configure a MongoDB sink, you must specify a connection {@code URI}, a {@code Database}
  * name, a {@code Collection} name. For instance:</p>
- * <p>
+ *
  * <pre>{@code
  *
  * pipeline
@@ -91,8 +91,6 @@ import org.slf4j.LoggerFactory;
  *
  * }</pre>
  */
-// TODO instead of JSON String, does it make sense to populate the PCollection with BSON Document or
-//  DBObject ??
 public class MongoDbIO {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(MongoDbIO.class);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/26474c7e/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java
index e5e5c8f..5f4d122 100644
--- a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java
+++ b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java
@@ -75,7 +75,6 @@ import org.junit.experimental.categories.Category;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 /**
  * Test on the MongoDbGridFSIO.
  */
@@ -169,7 +168,7 @@ public class MongoDBGridFSIOTest implements Serializable {
     TestPipeline pipeline = TestPipeline.create();
 
     PCollection<String> output = pipeline.apply(
-        MongoDbGridFSIO.read()
+        MongoDbGridFSIO.<String>read()
             .withUri("mongodb://localhost:" + PORT)
             .withDatabase(DATABASE));
 
@@ -199,7 +198,7 @@ public class MongoDBGridFSIOTest implements Serializable {
     TestPipeline pipeline = TestPipeline.create();
 
     PCollection<KV<String, Integer>> output = pipeline.apply(
-        MongoDbGridFSIO.read()
+        MongoDbGridFSIO.<KV<String, Integer>>read()
             .withUri("mongodb://localhost:" + PORT)
             .withDatabase(DATABASE)
             .withBucket("mapBucket")
@@ -223,8 +222,8 @@ public class MongoDBGridFSIOTest implements Serializable {
                 }
               }
             })
-            .allowedTimestampSkew(new Duration(3601000L)))
-            .setCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()));
+            .withSkew(new Duration(3601000L))
+            .withCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of())));
 
     PAssert.thatSingleton(output.apply("Count All", Count.<KV<String, Integer>>globally()))
         .isEqualTo(50100L);
@@ -246,11 +245,11 @@ public class MongoDBGridFSIOTest implements Serializable {
   @Test
   public void testSplit() throws Exception {
     PipelineOptions options = PipelineOptionsFactory.create();
-    MongoDbGridFSIO.Read<String> read = MongoDbGridFSIO.read()
+    MongoDbGridFSIO.Read<String> read = MongoDbGridFSIO.<String>read()
         .withUri("mongodb://localhost:" + PORT)
         .withDatabase(DATABASE);
 
-    BoundedGridFSSource src = read.getSource();
+    BoundedGridFSSource src = new BoundedGridFSSource(read, null);
 
     // make sure 2 files can fit in
     long desiredBundleSizeBytes = (src.getEstimatedSizeBytes(options) * 2L) / 5L + 1000;


[2/2] incubator-beam git commit: [BEAM-698] This closes #1054

Posted by jb...@apache.org.
[BEAM-698] This closes #1054


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8130bc36
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8130bc36
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8130bc36

Branch: refs/heads/master
Commit: 8130bc36feca77737a4e171e14307f53410201c7
Parents: f27354f 26474c7
Author: Jean-Baptiste Onofr� <jb...@apache.org>
Authored: Thu Oct 6 14:02:34 2016 +0200
Committer: Jean-Baptiste Onofr� <jb...@apache.org>
Committed: Thu Oct 6 14:02:34 2016 +0200

----------------------------------------------------------------------
 .../beam/sdk/io/mongodb/MongoDbGridFSIO.java    | 197 ++++++++++---------
 .../apache/beam/sdk/io/mongodb/MongoDbIO.java   |  20 +-
 .../sdk/io/mongodb/MongoDBGridFSIOTest.java     |  13 +-
 3 files changed, 117 insertions(+), 113 deletions(-)
----------------------------------------------------------------------