You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2016/11/15 11:06:04 UTC

[1/2] incubator-beam git commit: [BEAM-948] Add ability to write files to GridFS

Repository: incubator-beam
Updated Branches:
  refs/heads/master 13ad8f68a -> 9c300cde8


[BEAM-948] Add ability to write files to GridFS


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/79d5ad9d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/79d5ad9d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/79d5ad9d

Branch: refs/heads/master
Commit: 79d5ad9d7f629481cf04dd65e83f1b06708e16bc
Parents: 13ad8f6
Author: Daniel Kulp <dk...@apache.org>
Authored: Mon Nov 7 17:21:55 2016 -0500
Committer: Jean-Baptiste Onofr� <jb...@apache.org>
Committed: Tue Nov 15 11:46:34 2016 +0100

----------------------------------------------------------------------
 .../beam/sdk/io/mongodb/MongoDbGridFSIO.java    | 308 +++++++++++++++++--
 .../sdk/io/mongodb/MongoDBGridFSIOTest.java     |  93 ++++++
 2 files changed, 369 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/79d5ad9d/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java
index 8c9a65c..26e2c2f 100644
--- a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java
+++ b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.io.mongodb;
 
+import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 
 import com.google.auto.value.AutoValue;
@@ -27,11 +28,13 @@ import com.mongodb.Mongo;
 import com.mongodb.MongoURI;
 import com.mongodb.gridfs.GridFS;
 import com.mongodb.gridfs.GridFSDBFile;
+import com.mongodb.gridfs.GridFSInputFile;
 import com.mongodb.util.JSON;
 
 import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.io.OutputStream;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Iterator;
@@ -51,6 +54,7 @@ import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
 import org.bson.types.ObjectId;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
@@ -84,6 +88,36 @@ import org.joda.time.Instant;
  * the file as the timestamp.
  * When using a parser that outputs with custom timestamps, you may also need to specify
  * the allowedTimestampSkew option.</p>
+ *
+ *
+ *
+ * <h3>Writing to MongoDB via GridFS</h3>
+ *
+ * <p>MongoDBGridFS supports writing of data to a file in a MongoDB GridFS collection.</p>
+ *
+ * <p>To configure a MongoDB GridFS sink, you can provide the connection URI, the database name
+ * and the bucket name.  You must also provide the filename to write to. Another optional parameter
+ * is the GridFS file chunkSize.
+ *
+ * For instance:</p>
+ *
+ * <pre>{@code
+ *
+ * pipeline
+ *   .apply(...)
+ *   .apply(MongoDbGridFSIO.write()
+ *     .withUri("mongodb://localhost:27017")
+ *     .withDatabase("my-database")
+ *     .withBucket("my-bucket")
+ *     .withChunkSize(256000L)
+ *     .withFilename("my-output.txt"))
+ *
+ * }</pre>
+ *
+ * <p>There is also an optional argument to the {@code create()} method to specify a writer
+ * that is used to write the data to the OutputStream.  By default, it writes UTF-8 strings
+ * to the file separated with line feeds.
+ * </p>
  */
 public class MongoDbGridFSIO {
 
@@ -136,19 +170,68 @@ public class MongoDbGridFSIO {
 
   /** Read data from GridFS. Default behavior with String. */
   public static Read<String> read() {
-    return new AutoValue_MongoDbGridFSIO_Read.Builder<String>().build()
-        .withParser(TEXT_PARSER).withCoder(StringUtf8Coder.of());
+    return new AutoValue_MongoDbGridFSIO_Read.Builder<String>()
+        .setParser(TEXT_PARSER)
+        .setCoder(StringUtf8Coder.of())
+        .setConnectionConfiguration(ConnectionConfiguration.create())
+        .build();
   }
 
+  /** Write data to GridFS. Default behavior with String. */
+  public static Write<String> write() {
+    return new AutoValue_MongoDbGridFSIO_Write.Builder<String>()
+        .setConnectionConfiguration(ConnectionConfiguration.create())
+        .setWriteFn(new WriteFn<String>() {
+          @Override
+          public void write(String output, OutputStream outStream) throws IOException {
+            outStream.write(output.getBytes("utf-8"));
+            outStream.write('\n');
+          }
+        }).build();
+  }
+  public static <T> Write<T> write(WriteFn<T> fn) {
+    return new AutoValue_MongoDbGridFSIO_Write.Builder<T>()
+        .setWriteFn(fn)
+        .setConnectionConfiguration(ConnectionConfiguration.create())
+        .build();
+  }
+
+
   /**
-   * A {@link PTransform} to read data from MongoDB GridFS.
+   * Encapsulate the MongoDB GridFS connection logic.
    */
   @AutoValue
-  public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>> {
-
+  public abstract static class ConnectionConfiguration implements Serializable {
     @Nullable abstract String uri();
     @Nullable abstract String database();
     @Nullable abstract String bucket();
+
+    static ConnectionConfiguration create() {
+      return new AutoValue_MongoDbGridFSIO_ConnectionConfiguration(null, null, null);
+    }
+    static ConnectionConfiguration create(String uri, String database, String bucket) {
+      return new AutoValue_MongoDbGridFSIO_ConnectionConfiguration(uri, database, bucket);
+    }
+
+
+    Mongo setupMongo() {
+      return uri() == null ? new Mongo() : new Mongo(new MongoURI(uri()));
+    }
+
+    GridFS setupGridFS(Mongo mongo) {
+      DB db = database() == null ? mongo.getDB("gridfs") : mongo.getDB(database());
+      return bucket() == null ? new GridFS(db) : new GridFS(db, bucket());
+    }
+
+  }
+
+  /**
+   * A {@link PTransform} to read data from MongoDB GridFS.
+   */
+  @AutoValue
+  public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>> {
+
+    abstract ConnectionConfiguration connectionConfiguration();
     @Nullable abstract Parser<T> parser();
     @Nullable abstract Coder<T> coder();
     @Nullable abstract Duration skew();
@@ -158,9 +241,7 @@ public class MongoDbGridFSIO {
 
     @AutoValue.Builder
     abstract static class Builder<T> {
-      abstract Builder<T> setUri(String uri);
-      abstract Builder<T> setDatabase(String database);
-      abstract Builder<T> setBucket(String bucket);
+      abstract Builder<T> setConnectionConfiguration(ConnectionConfiguration connection);
       abstract Builder<T> setParser(Parser<T> parser);
       abstract Builder<T> setCoder(Coder<T> coder);
       abstract Builder<T> setSkew(Duration skew);
@@ -170,17 +251,29 @@ public class MongoDbGridFSIO {
 
     public Read<T> withUri(String uri) {
       checkNotNull(uri);
-      return toBuilder().setUri(uri).build();
+      ConnectionConfiguration config = ConnectionConfiguration
+          .create(uri,
+                  connectionConfiguration().database(),
+                  connectionConfiguration().bucket());
+      return toBuilder().setConnectionConfiguration(config).build();
     }
 
     public Read<T> withDatabase(String database) {
       checkNotNull(database);
-      return toBuilder().setDatabase(database).build();
+      ConnectionConfiguration config = ConnectionConfiguration
+          .create(connectionConfiguration().uri(),
+                  database,
+                  connectionConfiguration().bucket());
+      return toBuilder().setConnectionConfiguration(config).build();
     }
 
     public Read<T> withBucket(String bucket) {
       checkNotNull(bucket);
-      return toBuilder().setBucket(bucket).build();
+      ConnectionConfiguration config = ConnectionConfiguration
+          .create(connectionConfiguration().uri(),
+                  connectionConfiguration().database(),
+                  bucket);
+      return toBuilder().setConnectionConfiguration(config).build();
     }
 
     public <X> Read<X> withParser(Parser<X> parser) {
@@ -205,9 +298,9 @@ public class MongoDbGridFSIO {
     @Override
     public void populateDisplayData(DisplayData.Builder builder) {
       super.populateDisplayData(builder);
-      builder.addIfNotNull(DisplayData.item("uri", uri()));
-      builder.addIfNotNull(DisplayData.item("database", database()));
-      builder.addIfNotNull(DisplayData.item("bucket", bucket()));
+      builder.addIfNotNull(DisplayData.item("uri", connectionConfiguration().uri()));
+      builder.addIfNotNull(DisplayData.item("database", connectionConfiguration().database()));
+      builder.addIfNotNull(DisplayData.item("bucket", connectionConfiguration().bucket()));
       builder.addIfNotNull(DisplayData.item("parser", parser().getClass().getName()));
       builder.addIfNotNull(DisplayData.item("coder", coder().getClass().getName()));
       builder.addIfNotNull(DisplayData.item("skew", skew()));
@@ -226,8 +319,8 @@ public class MongoDbGridFSIO {
 
             @Setup
             public void setup() {
-              mongo = source.setupMongo();
-              gridfs = source.setupGridFS(mongo);
+              mongo = source.spec.connectionConfiguration().setupMongo();
+              gridfs = source.spec.connectionConfiguration().setupGridFS(mongo);
             }
 
             @Teardown
@@ -269,24 +362,16 @@ public class MongoDbGridFSIO {
      */
     protected static class BoundedGridFSSource extends BoundedSource<ObjectId> {
 
-      private Read spec;
+      private Read<?> spec;
 
       @Nullable
       private List<ObjectId> objectIds;
 
-      BoundedGridFSSource(Read spec, List<ObjectId> objectIds) {
+      BoundedGridFSSource(Read<?> spec, List<ObjectId> objectIds) {
         this.spec = spec;
         this.objectIds = objectIds;
       }
 
-      private Mongo setupMongo() {
-        return spec.uri() == null ? new Mongo() : new Mongo(new MongoURI(spec.uri()));
-      }
-
-      private GridFS setupGridFS(Mongo mongo) {
-        DB db = spec.database() == null ? mongo.getDB("gridfs") : mongo.getDB(spec.database());
-        return spec.bucket() == null ? new GridFS(db) : new GridFS(db, spec.bucket());
-      }
 
       private DBCursor createCursor(GridFS gridfs) {
         if (spec.filter() != null) {
@@ -299,9 +384,9 @@ public class MongoDbGridFSIO {
       @Override
       public List<? extends BoundedSource<ObjectId>> splitIntoBundles(long desiredBundleSizeBytes,
           PipelineOptions options) throws Exception {
-        Mongo mongo = setupMongo();
+        Mongo mongo = spec.connectionConfiguration().setupMongo();
         try {
-          GridFS gridfs = setupGridFS(mongo);
+          GridFS gridfs = spec.connectionConfiguration().setupGridFS(mongo);
           DBCursor cursor = createCursor(gridfs);
           long size = 0;
           List<BoundedGridFSSource> list = new ArrayList<>();
@@ -328,9 +413,9 @@ public class MongoDbGridFSIO {
 
       @Override
       public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
-        Mongo mongo = setupMongo();
+        Mongo mongo = spec.connectionConfiguration().setupMongo();
         try {
-          GridFS gridfs = setupGridFS(mongo);
+          GridFS gridfs = spec.connectionConfiguration().setupGridFS(mongo);
           DBCursor cursor = createCursor(gridfs);
           long size = 0;
           while (cursor.hasNext()) {
@@ -397,8 +482,8 @@ public class MongoDbGridFSIO {
         @Override
         public boolean start() throws IOException {
           if (objects == null) {
-            mongo = source.setupMongo();
-            GridFS gridfs = source.setupGridFS(mongo);
+            mongo = source.spec.connectionConfiguration().setupMongo();
+            GridFS gridfs = source.spec.connectionConfiguration().setupGridFS(mongo);
             cursor = source.createCursor(gridfs);
           } else {
             iterator = objects.iterator();
@@ -446,4 +531,163 @@ public class MongoDbGridFSIO {
       }
     }
   }
+
+
+  /**
+   * Function that is called to write the data to the give GridFS OutputStream.
+   * @param <T>
+   */
+  public interface WriteFn<T> extends Serializable {
+    /**
+     * Output the object to the given OutputStream.
+     * @param output The data to output
+     * @param outStream The OutputStream
+     */
+    void write(T output, OutputStream outStream) throws IOException;
+  }
+
+  /**
+   * A {@link PTransform} to write data to MongoDB GridFS.
+   */
+  @AutoValue
+  public abstract static class Write<T> extends PTransform<PCollection<T>, PDone> {
+    abstract ConnectionConfiguration connectionConfiguration();
+    @Nullable abstract Long chunkSize();
+    abstract WriteFn<T> writeFn();
+    @Nullable abstract String filename();
+
+    abstract Builder<T> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> setConnectionConfiguration(ConnectionConfiguration connection);
+      abstract Builder<T> setFilename(String filename);
+      abstract Builder<T> setChunkSize(Long chunkSize);
+      abstract Builder<T> setWriteFn(WriteFn<T> fn);
+      abstract Write<T> build();
+    }
+
+    public Write<T> withUri(String uri) {
+      checkNotNull(uri);
+      ConnectionConfiguration config = ConnectionConfiguration
+          .create(uri,
+                  connectionConfiguration().database(),
+                  connectionConfiguration().bucket());
+      return toBuilder().setConnectionConfiguration(config).build();
+    }
+
+    public Write<T> withDatabase(String database) {
+      checkNotNull(database);
+      ConnectionConfiguration config = ConnectionConfiguration
+          .create(connectionConfiguration().uri(),
+                  database,
+                  connectionConfiguration().bucket());
+      return toBuilder().setConnectionConfiguration(config).build();
+    }
+
+    public Write<T> withBucket(String bucket) {
+      checkNotNull(bucket);
+      ConnectionConfiguration config = ConnectionConfiguration
+          .create(connectionConfiguration().uri(),
+                  connectionConfiguration().database(),
+                  bucket);
+      return toBuilder().setConnectionConfiguration(config).build();
+    }
+
+    public Write<T> withFilename(String filename) {
+      checkNotNull(filename);
+      return toBuilder().setFilename(filename).build();
+    }
+
+    public Write<T> withChunkSize(Long chunkSize) {
+      checkNotNull(chunkSize);
+      checkArgument(chunkSize > 1, "Chunk Size must be greater than 1", chunkSize);
+      return toBuilder().setChunkSize(chunkSize).build();
+    }
+
+    public void validate(T input) {
+      checkNotNull(filename(), "filename");
+      checkNotNull(writeFn(), "writeFn");
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+      builder.addIfNotNull(DisplayData.item("uri", connectionConfiguration().uri()));
+      builder.addIfNotNull(DisplayData.item("database", connectionConfiguration().database()));
+      builder.addIfNotNull(DisplayData.item("bucket", connectionConfiguration().bucket()));
+      builder.addIfNotNull(DisplayData.item("chunkSize", chunkSize()));
+      builder.addIfNotNull(DisplayData.item("filename", filename()));
+    }
+
+    @Override
+    public PDone apply(PCollection<T> input) {
+      input.apply(ParDo.of(new GridFsWriteFn<T>(this)));
+      return PDone.in(input.getPipeline());
+    }
+  }
+  private static class GridFsWriteFn<T> extends DoFn<T, Void> {
+
+    private final Write<T> spec;
+
+    private Mongo mongo;
+    private GridFS gridfs;
+
+    private GridFSInputFile gridFsFile;
+    private OutputStream outputStream;
+
+    public GridFsWriteFn(Write<T> spec) {
+      this.spec = spec;
+    }
+
+    @Setup
+    public void setup() throws Exception {
+      mongo = spec.connectionConfiguration().setupMongo();
+      gridfs = spec.connectionConfiguration().setupGridFS(mongo);
+    }
+
+    @StartBundle
+    public void startBundle(Context context) {
+      gridFsFile = gridfs.createFile(spec.filename());
+      if (spec.chunkSize() != null) {
+        gridFsFile.setChunkSize(spec.chunkSize());
+      }
+      outputStream = gridFsFile.getOutputStream();
+    }
+
+    @ProcessElement
+    public void processElement(ProcessContext context) throws Exception {
+      T record = context.element();
+      spec.writeFn().write(record, outputStream);
+    }
+
+    @FinishBundle
+    public void finishBundle(Context context) throws Exception {
+      if (gridFsFile != null) {
+        outputStream.flush();
+        outputStream.close();
+        outputStream = null;
+        gridFsFile = null;
+      }
+    }
+
+    @Teardown
+    public void teardown() throws Exception {
+      try {
+        if (gridFsFile != null) {
+          outputStream.flush();
+          outputStream.close();
+          outputStream = null;
+          gridFsFile = null;
+        }
+      } finally {
+        if (mongo != null) {
+          mongo.close();
+          mongo = null;
+          gridfs = null;
+        }
+      }
+    }
+
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/79d5ad9d/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java
index dc27ee2..5061487 100644
--- a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java
+++ b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java
@@ -18,6 +18,7 @@
 package org.apache.beam.sdk.io.mongodb;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 import com.mongodb.DB;
 import com.mongodb.Mongo;
@@ -40,22 +41,27 @@ import de.flapdoodle.embed.process.runtime.Network;
 import java.io.BufferedReader;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
 import java.io.File;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.io.OutputStream;
 import java.io.OutputStreamWriter;
 import java.io.Serializable;
 import java.net.ServerSocket;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
 import java.util.Scanner;
 
+import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.mongodb.MongoDbGridFSIO.Read.BoundedGridFSSource;
+import org.apache.beam.sdk.io.mongodb.MongoDbGridFSIO.WriteFn;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.NeedsRunner;
@@ -63,6 +69,7 @@ import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.SourceTestUtils;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.Max;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.values.KV;
@@ -283,4 +290,90 @@ public class MongoDBGridFSIOTest implements Serializable {
     assertEquals(5, count);
   }
 
+
+
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testWriteMessage() throws Exception {
+
+    Pipeline pipeline = TestPipeline.create();
+
+    ArrayList<String> data = new ArrayList<>(100);
+    ArrayList<Integer> intData = new ArrayList<>(100);
+    for (int i = 0; i < 1000; i++) {
+      data.add("Message " + i);
+    }
+    for (int i = 0; i < 100; i++) {
+      intData.add(i);
+    }
+    pipeline.apply("String", Create.of(data))
+        .apply("StringInternal", MongoDbGridFSIO.write()
+            .withUri("mongodb://localhost:" + port)
+            .withDatabase(DATABASE)
+            .withChunkSize(100L)
+            .withBucket("WriteTest")
+            .withFilename("WriteTestData"));
+
+    pipeline.apply("WithWriteFn", Create.of(intData))
+      .apply("WithWriteFnInternal", MongoDbGridFSIO.write(new WriteFn<Integer>() {
+        @Override
+        public void write(Integer output, OutputStream outStream) throws IOException {
+          //one byte per output
+          outStream.write(output.byteValue());
+        }
+      })
+        .withUri("mongodb://localhost:" + port)
+        .withDatabase(DATABASE)
+        .withBucket("WriteTest")
+        .withFilename("WriteTestIntData"));
+
+    pipeline.run();
+
+    Mongo client = null;
+    try {
+      StringBuilder results = new StringBuilder();
+      client = new Mongo("localhost", port);
+      DB database = client.getDB(DATABASE);
+      GridFS gridfs = new GridFS(database, "WriteTest");
+      List<GridFSDBFile> files = gridfs.find("WriteTestData");
+      assertTrue(files.size() > 0);
+      for (GridFSDBFile file : files) {
+        assertEquals(100,  file.getChunkSize());
+        int l = (int) file.getLength();
+        try (InputStream ins = file.getInputStream()) {
+          DataInputStream dis = new DataInputStream(ins);
+          byte b[] = new byte[l];
+          dis.readFully(b);
+          results.append(new String(b, "utf-8"));
+        }
+      }
+      String dataString = results.toString();
+      for (int x = 0; x < 1000; x++) {
+        assertTrue(dataString.contains("Message " + x));
+      }
+
+      files = gridfs.find("WriteTestIntData");
+      boolean intResults[] = new boolean[100];
+      for (GridFSDBFile file : files) {
+        int l = (int) file.getLength();
+        try (InputStream ins = file.getInputStream()) {
+          DataInputStream dis = new DataInputStream(ins);
+          byte b[] = new byte[l];
+          dis.readFully(b);
+          for (int x = 0; x < b.length; x++) {
+            intResults[b[x]] = true;
+          }
+        }
+      }
+
+      for (int x = 0; x < 100; x++) {
+        assertTrue("Did not get a result for " + x, intResults[x]);
+      }
+    } finally {
+      if (client != null) {
+        client.close();
+      }
+    }
+  }
 }


[2/2] incubator-beam git commit: [BEAM-948] This closes #1324

Posted by jb...@apache.org.
[BEAM-948] This closes #1324


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9c300cde
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9c300cde
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9c300cde

Branch: refs/heads/master
Commit: 9c300cde8cca3e2c0ccdf9e87cbf4946c4199517
Parents: 13ad8f6 79d5ad9
Author: Jean-Baptiste Onofr� <jb...@apache.org>
Authored: Tue Nov 15 12:05:20 2016 +0100
Committer: Jean-Baptiste Onofr� <jb...@apache.org>
Committed: Tue Nov 15 12:05:20 2016 +0100

----------------------------------------------------------------------
 .../beam/sdk/io/mongodb/MongoDbGridFSIO.java    | 308 +++++++++++++++++--
 .../sdk/io/mongodb/MongoDBGridFSIOTest.java     |  93 ++++++
 2 files changed, 369 insertions(+), 32 deletions(-)
----------------------------------------------------------------------