You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2016/10/02 14:11:11 UTC

[1/2] incubator-beam git commit: [BEAM-698] Use AutoValue and deal with Document instead of String in MongoDbIO

Repository: incubator-beam
Updated Branches:
  refs/heads/master 2e0adaf02 -> 202acd1d6


[BEAM-698] Use AutoValue and deal with Document instead of String in MongoDbIO


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4d95423b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4d95423b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4d95423b

Branch: refs/heads/master
Commit: 4d95423bfe5ede1c48fea9489a18c17079691088
Parents: 2e0adaf
Author: Eugene Kirpichov <ki...@google.com>
Authored: Wed Sep 28 17:17:22 2016 -0700
Committer: Jean-Baptiste Onofr� <jb...@apache.org>
Committed: Sun Oct 2 15:49:49 2016 +0200

----------------------------------------------------------------------
 sdks/java/io/mongodb/pom.xml                    |   6 +
 .../apache/beam/sdk/io/mongodb/MongoDbIO.java   | 276 ++++++++-----------
 .../beam/sdk/io/mongodb/MongoDbIOTest.java      |  19 +-
 3 files changed, 132 insertions(+), 169 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d95423b/sdks/java/io/mongodb/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/mongodb/pom.xml b/sdks/java/io/mongodb/pom.xml
index b7e36af..5555173 100644
--- a/sdks/java/io/mongodb/pom.xml
+++ b/sdks/java/io/mongodb/pom.xml
@@ -94,6 +94,12 @@
       <artifactId>joda-time</artifactId>
     </dependency>
 
+    <dependency>
+      <groupId>com.google.auto.value</groupId>
+      <artifactId>auto-value</artifactId>
+      <scope>provided</scope>
+    </dependency>
+
     <!-- test dependencies -->
     <dependency>
       <groupId>de.flapdoodle.embed</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d95423b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
index a54694a..7c2bc28 100644
--- a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
+++ b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
@@ -17,9 +17,10 @@
  */
 package org.apache.beam.sdk.io.mongodb;
 
+import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 
-import com.google.common.annotations.VisibleForTesting;
+import com.google.auto.value.AutoValue;
 import com.mongodb.BasicDBObject;
 import com.mongodb.MongoClient;
 import com.mongodb.MongoClientURI;
@@ -98,12 +99,12 @@ public class MongoDbIO {
 
   /** Read data from MongoDB. */
   public static Read read() {
-    return new Read(new BoundedMongoDbSource(null, null, null, null, 0));
+    return new AutoValue_MongoDbIO_Read.Builder().setNumSplits(0).build();
   }
 
   /** Write data to MongoDB. */
   public static Write write() {
-    return new Write(new Write.MongoDbWriter(null, null, null, 1024L));
+    return new AutoValue_MongoDbIO_Write.Builder().setBatchSize(1024L).build();
   }
 
   private MongoDbIO() {
@@ -112,118 +113,97 @@ public class MongoDbIO {
   /**
    * A {@link PTransform} to read data from MongoDB.
    */
-  public static class Read extends PTransform<PBegin, PCollection<String>> {
+  @AutoValue
+  public abstract static class Read extends PTransform<PBegin, PCollection<Document>> {
+    @Nullable abstract String uri();
+    @Nullable abstract String database();
+    @Nullable abstract String collection();
+    @Nullable abstract String filter();
+    abstract int numSplits();
+
+    abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setUri(String uri);
+      abstract Builder setDatabase(String database);
+      abstract Builder setCollection(String collection);
+      abstract Builder setFilter(String filter);
+      abstract Builder setNumSplits(int numSplits);
+      abstract Read build();
+    }
 
+    /**
+     * Example documentation for withUri.
+     */
     public Read withUri(String uri) {
-      return new Read(source.withUri(uri));
+      checkNotNull(uri);
+      return toBuilder().setUri(uri).build();
     }
 
     public Read withDatabase(String database) {
-      return new Read(source.withDatabase(database));
+      checkNotNull(database);
+      return toBuilder().setDatabase(database).build();
     }
 
     public Read withCollection(String collection) {
-      return new Read(source.withCollection(collection));
+      checkNotNull(collection);
+      return toBuilder().setCollection(collection).build();
     }
 
     public Read withFilter(String filter) {
-      return new Read(source.withFilter(filter));
+      checkNotNull(filter);
+      return toBuilder().setFilter(filter).build();
     }
 
     public Read withNumSplits(int numSplits) {
-      return new Read(source.withNumSplits(numSplits));
-    }
-
-    private final BoundedMongoDbSource source;
-
-    private Read(BoundedMongoDbSource source) {
-      this.source = source;
+      checkArgument(numSplits >= 0);
+      return toBuilder().setNumSplits(numSplits).build();
     }
 
     @Override
-    public PCollection<String> apply(PBegin input) {
-      return input.apply(org.apache.beam.sdk.io.Read.from(getSource()));
-    }
-
-    /**
-     * Creates a {@link BoundedSource} with the configuration in {@link Read}.
-     */
-    @VisibleForTesting
-    BoundedSource<String> getSource() {
-      return source;
+    public PCollection<Document> apply(PBegin input) {
+      return input.apply(org.apache.beam.sdk.io.Read.from(new BoundedMongoDbSource(this)));
     }
 
     @Override
     public void validate(PBegin input) {
-      source.validate();
+      checkNotNull(uri(), "uri");
+      checkNotNull(database(), "database");
+      checkNotNull(collection(), "collection");
     }
 
     @Override
     public void populateDisplayData(DisplayData.Builder builder) {
       super.populateDisplayData(builder);
-
-      source.populateDisplayData(builder);
+      builder.add(DisplayData.item("uri", uri()));
+      builder.add(DisplayData.item("database", database()));
+      builder.add(DisplayData.item("collection", collection()));
+      builder.addIfNotNull(DisplayData.item("filter", filter()));
+      builder.add(DisplayData.item("numSplit", numSplits()));
     }
-
   }
 
-  private static class BoundedMongoDbSource extends BoundedSource<String> {
-
-    public BoundedMongoDbSource withUri(String uri) {
-      return new BoundedMongoDbSource(uri, database, collection, filter, numSplits);
-    }
-
-    public BoundedMongoDbSource withDatabase(String database) {
-      return new BoundedMongoDbSource(uri, database, collection, filter, numSplits);
-    }
-
-    public BoundedMongoDbSource withCollection(String collection) {
-      return new BoundedMongoDbSource(uri, database, collection, filter, numSplits);
-    }
+  private static class BoundedMongoDbSource extends BoundedSource<Document> {
+    private Read spec;
 
-    public BoundedMongoDbSource withFilter(String filter) {
-      return new BoundedMongoDbSource(uri, database, collection, filter, numSplits);
-    }
-
-    public BoundedMongoDbSource withNumSplits(int numSplits) {
-      return new BoundedMongoDbSource(uri, database, collection, filter, numSplits);
-    }
-
-    private final String uri;
-    private final String database;
-    private final String collection;
-    @Nullable
-    private final String filter;
-    private final int numSplits;
-
-    public BoundedMongoDbSource(String uri, String database, String collection, String filter,
-                                int numSplits) {
-      this.uri = uri;
-      this.database = database;
-      this.collection = collection;
-      this.filter = filter;
-      this.numSplits = numSplits;
+    private BoundedMongoDbSource(Read spec) {
+      this.spec = spec;
     }
 
     @Override
-    public Coder getDefaultOutputCoder() {
-      return SerializableCoder.of(String.class);
+    public Coder<Document> getDefaultOutputCoder() {
+      return SerializableCoder.of(Document.class);
     }
 
     @Override
     public void validate() {
-      checkNotNull(uri, "uri");
-      checkNotNull(database, "database");
-      checkNotNull(collection, "collection");
+      spec.validate(null);
     }
 
     @Override
     public void populateDisplayData(DisplayData.Builder builder) {
-      builder.add(DisplayData.item("uri", uri));
-      builder.add(DisplayData.item("database", database));
-      builder.add(DisplayData.item("collection", collection));
-      builder.addIfNotNull(DisplayData.item("filter", filter));
-      builder.add(DisplayData.item("numSplit", numSplits));
+      spec.populateDisplayData(builder);
     }
 
     @Override
@@ -232,39 +212,35 @@ public class MongoDbIO {
     }
 
     @Override
-    public BoundedReader createReader(PipelineOptions options) {
+    public BoundedReader<Document> createReader(PipelineOptions options) {
       return new BoundedMongoDbReader(this);
     }
 
     @Override
     public long getEstimatedSizeBytes(PipelineOptions pipelineOptions) {
-      long estimatedSizeBytes = 0L;
-
       MongoClient mongoClient = new MongoClient();
-      MongoDatabase mongoDatabase = mongoClient.getDatabase(database);
-      MongoCollection mongoCollection = mongoDatabase.getCollection(collection);
+      MongoDatabase mongoDatabase = mongoClient.getDatabase(spec.database());
 
       // get the Mongo collStats object
       // it gives the size for the entire collection
       BasicDBObject stat = new BasicDBObject();
-      stat.append("collStats", collection);
+      stat.append("collStats", spec.collection());
       Document stats = mongoDatabase.runCommand(stat);
-      estimatedSizeBytes = Long.valueOf(stats.get("size").toString());
-      return estimatedSizeBytes;
+      return Long.valueOf(stats.get("size").toString());
     }
 
     @Override
-    public List<BoundedSource<String>> splitIntoBundles(long desiredBundleSizeBytes,
+    public List<BoundedSource<Document>> splitIntoBundles(long desiredBundleSizeBytes,
                                                 PipelineOptions options) {
       MongoClient mongoClient = new MongoClient();
-      MongoDatabase mongoDatabase = mongoClient.getDatabase(database);
+      MongoDatabase mongoDatabase = mongoClient.getDatabase(spec.database());
 
-      List<Document> splitKeys = null;
-      if (numSplits > 0) {
+      List<Document> splitKeys;
+      if (spec.numSplits() > 0) {
         // the user defines his desired number of splits
         // calculate the batch size
         long estimatedSizeBytes = getEstimatedSizeBytes(options);
-        desiredBundleSizeBytes = estimatedSizeBytes / numSplits;
+        desiredBundleSizeBytes = estimatedSizeBytes / spec.numSplits();
       }
 
       // the desired batch size is small, using default chunk size of 1MB
@@ -275,7 +251,7 @@ public class MongoDbIO {
       // now we have the batch size (provided by user or provided by the runner)
       // we use Mongo splitVector command to get the split keys
       BasicDBObject splitVectorCommand = new BasicDBObject();
-      splitVectorCommand.append("splitVector", database + "." + collection);
+      splitVectorCommand.append("splitVector", spec.database() + "." + spec.collection());
       splitVectorCommand.append("keyPattern", new BasicDBObject().append("_id", 1));
       splitVectorCommand.append("force", false);
       // maxChunkSize is the Mongo partition size in MB
@@ -284,7 +260,7 @@ public class MongoDbIO {
       Document splitVectorCommandResult = mongoDatabase.runCommand(splitVectorCommand);
       splitKeys = (List<Document>) splitVectorCommandResult.get("splitKeys");
 
-      List<BoundedSource<String>> sources = new ArrayList<>();
+      List<BoundedSource<Document>> sources = new ArrayList<>();
       if (splitKeys.size() < 1) {
         LOGGER.debug("Split keys is low, using an unique source");
         sources.add(this);
@@ -292,8 +268,8 @@ public class MongoDbIO {
       }
 
       LOGGER.debug("Number of splits is {}", splitKeys.size());
-      for (String shardFilter : splitKeysToFilters(splitKeys, filter)) {
-        sources.add(this.withFilter(shardFilter));
+      for (String shardFilter : splitKeysToFilters(splitKeys, spec.filter())) {
+        sources.add(new BoundedMongoDbSource(spec.withFilter(shardFilter)));
       }
 
       return sources;
@@ -362,13 +338,12 @@ public class MongoDbIO {
     }
   }
 
-  private static class BoundedMongoDbReader extends BoundedSource.BoundedReader<String> {
-
+  private static class BoundedMongoDbReader extends BoundedSource.BoundedReader<Document> {
     private final BoundedMongoDbSource source;
 
     private MongoClient client;
     private MongoCursor<Document> cursor;
-    private String current;
+    private Document current;
 
     public BoundedMongoDbReader(BoundedMongoDbSource source) {
       this.source = source;
@@ -376,16 +351,17 @@ public class MongoDbIO {
 
     @Override
     public boolean start() {
-      client = new MongoClient(new MongoClientURI(source.uri));
+      Read spec = source.spec;
+      client = new MongoClient(new MongoClientURI(spec.uri()));
 
-      MongoDatabase mongoDatabase = client.getDatabase(source.database);
+      MongoDatabase mongoDatabase = client.getDatabase(spec.database());
 
-      MongoCollection<Document> mongoCollection = mongoDatabase.getCollection(source.collection);
+      MongoCollection<Document> mongoCollection = mongoDatabase.getCollection(spec.collection());
 
-      if (source.filter == null) {
+      if (spec.filter() == null) {
         cursor = mongoCollection.find().iterator();
       } else {
-        Document bson = Document.parse(source.filter);
+        Document bson = Document.parse(spec.filter());
         cursor = mongoCollection.find(bson).iterator();
       }
 
@@ -395,7 +371,7 @@ public class MongoDbIO {
     @Override
     public boolean advance() {
       if (cursor.hasNext()) {
-        current = cursor.next().toJson();
+        current = cursor.next();
         return true;
       } else {
         return false;
@@ -403,12 +379,12 @@ public class MongoDbIO {
     }
 
     @Override
-    public BoundedSource getCurrentSource() {
+    public BoundedMongoDbSource getCurrentSource() {
       return source;
     }
 
     @Override
-    public String getCurrent() {
+    public Document getCurrent() {
       return current;
     }
 
@@ -433,84 +409,66 @@ public class MongoDbIO {
   /**
    * A {@link PTransform} to write to a MongoDB database.
    */
-  public static class Write extends PTransform<PCollection<String>, PDone> {
+  @AutoValue
+  public abstract static class Write extends PTransform<PCollection<Document>, PDone> {
+    @Nullable abstract String uri();
+    @Nullable abstract String database();
+    @Nullable abstract String collection();
+    abstract long batchSize();
+
+    abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setUri(String uri);
+      abstract Builder setDatabase(String database);
+      abstract Builder setCollection(String collection);
+      abstract Builder setBatchSize(long batchSize);
+      abstract Write build();
+    }
 
     public Write withUri(String uri) {
-      return new Write(writer.withUri(uri));
+      return toBuilder().setUri(uri).build();
     }
 
     public Write withDatabase(String database) {
-      return new Write(writer.withDatabase(database));
+      return toBuilder().setDatabase(database).build();
     }
 
     public Write withCollection(String collection) {
-      return new Write(writer.withCollection(collection));
+      return toBuilder().setCollection(collection).build();
     }
 
     public Write withBatchSize(long batchSize) {
-      return new Write(writer.withBatchSize(batchSize));
-    }
-
-    private final MongoDbWriter writer;
-
-    private Write(MongoDbWriter writer) {
-      this.writer = writer;
+      return toBuilder().setBatchSize(batchSize).build();
     }
 
     @Override
-    public PDone apply(PCollection<String> input) {
-      input.apply(ParDo.of(writer));
+    public PDone apply(PCollection<Document> input) {
+      input.apply(ParDo.of(new WriteFn(this)));
       return PDone.in(input.getPipeline());
     }
 
     @Override
-    public void validate(PCollection<String> input) {
-      writer.validate();
+    public void validate(PCollection<Document> input) {
+      checkNotNull(uri(), "uri");
+      checkNotNull(database(), "database");
+      checkNotNull(collection(), "collection");
+      checkNotNull(batchSize(), "batchSize");
     }
 
-    private static class MongoDbWriter extends DoFn<String, Void> {
-
-      private final String uri;
-      private final String database;
-      private final String collection;
-      private final long batchSize;
-
+    private static class WriteFn extends DoFn<Document, Void> {
+      private final Write spec;
       private MongoClient client;
       private List<Document> batch;
 
-      public MongoDbWriter(String uri, String database, String collection, long batchSize) {
-        this.uri = uri;
-        this.database = database;
-        this.collection = collection;
-        this.batchSize = batchSize;
-      }
-
-      public MongoDbWriter withUri(String uri) {
-        return new MongoDbWriter(uri, database, collection, batchSize);
-      }
-
-      public MongoDbWriter withDatabase(String database) {
-        return new MongoDbWriter(uri, database, collection, batchSize);
-      }
-
-      public MongoDbWriter withCollection(String collection) {
-        return new MongoDbWriter(uri, database, collection, batchSize);
-      }
-
-      public MongoDbWriter withBatchSize(long batchSize) {
-        return new MongoDbWriter(uri, database, collection, batchSize);
-      }
-
-      public void validate() {
-        checkNotNull(uri, "uri");
-        checkNotNull(database, "database");
-        checkNotNull(collection, "collection");
-        checkNotNull(batchSize, "batchSize");
+      public WriteFn(Write spec) {
+        this.spec = spec;
       }
 
       @Setup
       public void createMongoClient() throws Exception {
-        client = new MongoClient(new MongoClientURI(uri));
+        client = new MongoClient(new MongoClientURI(spec.uri()));
       }
 
       @StartBundle
@@ -520,18 +478,18 @@ public class MongoDbIO {
 
       @ProcessElement
       public void processElement(ProcessContext ctx) throws Exception {
-        String value = ctx.element();
-
-        batch.add(Document.parse(ctx.element()));
-        if (batch.size() >= batchSize) {
+        // Need to copy the document because mongoCollection.insertMany() will mutate it
+        // before inserting (will assign an id).
+        batch.add(new Document(ctx.element()));
+        if (batch.size() >= spec.batchSize()) {
           finishBundle(ctx);
         }
       }
 
       @FinishBundle
       public void finishBundle(Context ctx) throws Exception {
-        MongoDatabase mongoDatabase = client.getDatabase(database);
-        MongoCollection mongoCollection = mongoDatabase.getCollection(collection);
+        MongoDatabase mongoDatabase = client.getDatabase(spec.database());
+        MongoCollection<Document> mongoCollection = mongoDatabase.getCollection(spec.collection());
 
         mongoCollection.insertMany(batch);
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d95423b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java
index 308e071..9a88267 100644
--- a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java
+++ b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java
@@ -127,20 +127,19 @@ public class MongoDbIOTest implements Serializable {
   public void testFullRead() throws Exception {
     TestPipeline pipeline = TestPipeline.create();
 
-    PCollection<String> output = pipeline.apply(
+    PCollection<Document> output = pipeline.apply(
         MongoDbIO.read()
           .withUri("mongodb://localhost:" + PORT)
           .withDatabase(DATABASE)
           .withCollection(COLLECTION));
 
-    PAssert.thatSingleton(output.apply("Count All", Count.<String>globally()))
+    PAssert.thatSingleton(output.apply("Count All", Count.<Document>globally()))
         .isEqualTo(1000L);
 
     PAssert.that(output
-        .apply("Map Scientist", MapElements.via(new SimpleFunction<String, KV<String, Void>>() {
-          public KV<String, Void> apply(String input) {
-            Document bson = Document.parse(input);
-            return KV.of(bson.getString("scientist"), null);
+        .apply("Map Scientist", MapElements.via(new SimpleFunction<Document, KV<String, Void>>() {
+          public KV<String, Void> apply(Document input) {
+            return KV.of(input.getString("scientist"), null);
           }
         }))
         .apply("Count Scientist", Count.<String, Void>perKey())
@@ -162,14 +161,14 @@ public class MongoDbIOTest implements Serializable {
   public void testReadWithFilter() throws Exception {
     TestPipeline pipeline = TestPipeline.create();
 
-    PCollection<String> output = pipeline.apply(
+    PCollection<Document> output = pipeline.apply(
         MongoDbIO.read()
         .withUri("mongodb://localhost:" + PORT)
         .withDatabase(DATABASE)
         .withCollection(COLLECTION)
         .withFilter("{\"scientist\":\"Einstein\"}"));
 
-    PAssert.thatSingleton(output.apply("Count", Count.<String>globally()))
+    PAssert.thatSingleton(output.apply("Count", Count.<Document>globally()))
         .isEqualTo(100L);
 
     pipeline.run();
@@ -180,9 +179,9 @@ public class MongoDbIOTest implements Serializable {
   public void testWrite() throws Exception {
     TestPipeline pipeline = TestPipeline.create();
 
-    ArrayList<String> data = new ArrayList<>();
+    ArrayList<Document> data = new ArrayList<>();
     for (int i = 0; i < 10000; i++) {
-      data.add(String.format("{\"scientist\":\"Test %s\"}", i));
+      data.add(Document.parse(String.format("{\"scientist\":\"Test %s\"}", i)));
     }
     pipeline.apply(Create.of(data))
         .apply(MongoDbIO.write().withUri("mongodb://localhost:" + PORT).withDatabase("test")


[2/2] incubator-beam git commit: [BEAM-698] This closes #1033

Posted by jb...@apache.org.
[BEAM-698] This closes #1033


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/202acd1d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/202acd1d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/202acd1d

Branch: refs/heads/master
Commit: 202acd1d6196b12b0d189b947ef202e32581cbed
Parents: 2e0adaf 4d95423
Author: Jean-Baptiste Onofr� <jb...@apache.org>
Authored: Sun Oct 2 15:51:40 2016 +0200
Committer: Jean-Baptiste Onofr� <jb...@apache.org>
Committed: Sun Oct 2 15:51:40 2016 +0200

----------------------------------------------------------------------
 sdks/java/io/mongodb/pom.xml                    |   6 +
 .../apache/beam/sdk/io/mongodb/MongoDbIO.java   | 276 ++++++++-----------
 .../beam/sdk/io/mongodb/MongoDbIOTest.java      |  19 +-
 3 files changed, 132 insertions(+), 169 deletions(-)
----------------------------------------------------------------------