You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2016/09/28 15:55:07 UTC

[1/2] incubator-beam git commit: [BEAM-674] Add GridFS support to MongoDbIO

Repository: incubator-beam
Updated Branches:
  refs/heads/master 3879db036 -> 307d592d2


[BEAM-674] Add GridFS support to MongoDbIO


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/68c8c787
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/68c8c787
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/68c8c787

Branch: refs/heads/master
Commit: 68c8c7872720f4e8fbcd017032c0e90e395e905c
Parents: 3879db0
Author: Daniel Kulp <dk...@apache.org>
Authored: Fri Sep 16 16:58:56 2016 -0400
Committer: Jean-Baptiste Onofr� <jb...@apache.org>
Committed: Wed Sep 28 17:18:46 2016 +0200

----------------------------------------------------------------------
 sdks/java/io/mongodb/pom.xml                    |   6 +-
 .../beam/sdk/io/mongodb/MongoDbGridFSIO.java    | 427 +++++++++++++++++++
 .../sdk/io/mongodb/MongoDBGridFSIOTest.java     | 257 +++++++++++
 3 files changed, 689 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/68c8c787/sdks/java/io/mongodb/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/mongodb/pom.xml b/sdks/java/io/mongodb/pom.xml
index 60f1d1e..b7e36af 100644
--- a/sdks/java/io/mongodb/pom.xml
+++ b/sdks/java/io/mongodb/pom.xml
@@ -89,6 +89,10 @@
       <artifactId>mongo-java-driver</artifactId>
       <version>${mongo-java-driver.version}</version>
     </dependency>
+    <dependency>
+      <groupId>joda-time</groupId>
+      <artifactId>joda-time</artifactId>
+    </dependency>
 
     <!-- test dependencies -->
     <dependency>
@@ -126,4 +130,4 @@
     </dependency>
   </dependencies>
 
-</project>
\ No newline at end of file
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/68c8c787/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java
new file mode 100644
index 0000000..337e5f5
--- /dev/null
+++ b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java
@@ -0,0 +1,427 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.mongodb;
+
+import com.mongodb.DB;
+import com.mongodb.DBCursor;
+import com.mongodb.DBObject;
+import com.mongodb.Mongo;
+import com.mongodb.MongoURI;
+import com.mongodb.gridfs.GridFS;
+import com.mongodb.gridfs.GridFSDBFile;
+import com.mongodb.util.JSON;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import javax.annotation.Nullable;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.bson.types.ObjectId;
+import org.joda.time.Instant;
+
+
+/**
+  * IO to read and write data on MongoDB GridFS.
+ * <p>
+ * <h3>Reading from MongoDB via GridFS</h3>
+ * <p>
+ * <p>MongoDbGridFSIO source returns a bounded collection of String as {@code PCollection<String>}.
+ * <p>
+ * <p>To configure the MongoDB source, you have to provide the connection URI, the database name
+ * and the bucket name. The following example illustrates various options for configuring the
+ * source:</p>
+ * <p>
+ * <pre>{@code
+ *
+ * pipeline.apply(MongoDbGridFSIO.read()
+ *   .withUri("mongodb://localhost:27017")
+ *   .withDatabase("my-database")
+ *   .withBucket("my-bucket"))
+ *
+ * }</pre>
+ *
+ * <p>The source also accepts an optional configuration: {@code withQueryFilter()} allows you to
+ * define a JSON filter to get subset of files in the database.</p>
+ *
+ * <p>There is also an optional {@code ParseCallback} that can be specified that can be used to
+ * parse the InputStream into objects usable with Beam.  By default, MongoDbGridFSIO will parse
+ * into Strings, splitting on line breaks and using the uploadDate of the file as the timestamp.
+ */
+public class MongoDbGridFSIO {
+
+  /**
+   * Function for parsing the GridFSDBFile into objects for the PCollection.
+   * @param <T>
+   */
+  public interface ParseCallback<T> extends Serializable {
+    /**
+     * Each value parsed from the file should be output as an
+     * Iterable of Line&lt;T&gt;.  If timestamp is omitted, it will
+     * use the uploadDate of the GridFSDBFile.
+     */
+    public static class Line<T> {
+      final Instant timestamp;
+      final T value;
+
+      public Line(T value, Instant timestamp) {
+        this.value = value;
+        this.timestamp = timestamp;
+      }
+      public Line(T value) {
+        this.value = value;
+        this.timestamp = null;
+      }
+    };
+    public Iterator<Line<T>> parse(GridFSDBFile input) throws IOException;
+  }
+
+  /**
+   * Default implementation for parsing the InputStream to collection of
+   * strings splitting on the cr/lf.
+   */
+  private static class StringsParseCallback implements ParseCallback<String> {
+    static final StringsParseCallback INSTANCE = new StringsParseCallback();
+
+    @Override
+    public Iterator<Line<String>> parse(final GridFSDBFile input) throws IOException {
+      final BufferedReader reader =
+          new BufferedReader(new InputStreamReader(input.getInputStream()));
+      return new Iterator<Line<String>>() {
+        String val = reader.readLine();
+        @Override
+        public boolean hasNext() {
+          return val != null;
+        }
+
+        @Override
+        public Line<String> next() {
+          Line<String> l = new Line<String>(val);
+          try {
+            val = reader.readLine();
+          } catch (IOException e) {
+            val = null;
+          }
+          return l;
+        }
+
+        @Override
+        public void remove() {
+          throw new UnsupportedOperationException("Remove not supported");
+        }
+      };
+    }
+  }
+
+  /** Read data from GridFS. */
+  public static Read<String> read() {
+    return new Read<String>(new Read.BoundedGridFSSource<String>(null, null, null, null,
+                            StringsParseCallback.INSTANCE, StringUtf8Coder.of()));
+  }
+
+  static class Read<T> extends PTransform<PBegin, PCollection<T>> {
+    public Read<T> withUri(String uri) {
+      return new Read<T>(new BoundedGridFSSource<T>(uri, options.database,
+                                           options.bucket, options.filterJson,
+                                           options.parser, options.coder));
+    }
+
+    public Read<T> withDatabase(String database) {
+      return new Read<T>(new BoundedGridFSSource<T>(options.uri, database,
+                                           options.bucket, options.filterJson,
+                                           options.parser, options.coder));
+    }
+
+    public Read<T> withBucket(String bucket) {
+      return new Read<T>(new BoundedGridFSSource<T>(options.uri, options.database, bucket,
+          options.filterJson, options.parser, options.coder));
+    }
+
+    public <X> Read<X> withParsingFn(ParseCallback<X> f) {
+      return new Read<X>(new BoundedGridFSSource<X>(options.uri, options.database,
+          options.bucket, options.filterJson, f, null));
+    }
+
+    public Read<T> withCoder(Coder<T> coder) {
+      return new Read<T>(new BoundedGridFSSource<T>(options.uri, options.database,
+          options.bucket, options.filterJson, options.parser, coder));
+    }
+
+    public Read<T> withQueryFilter(String filterJson) {
+      return new Read<T>(new BoundedGridFSSource<T>(options.uri, options.database,
+          options.bucket, filterJson, options.parser, options.coder));
+    }
+
+    private final BoundedGridFSSource<T> options;
+
+    Read(BoundedGridFSSource<T> options) {
+      this.options = options;
+    }
+
+    @Override
+    public PCollection<T> apply(PBegin input) {
+      org.apache.beam.sdk.io.Read.Bounded<T> unbounded =
+          org.apache.beam.sdk.io.Read.from(options);
+      PCollection<T> output = input.getPipeline().apply(unbounded);
+      if (options.coder != null) {
+        output.setCoder(options.coder);
+      }
+      return output;
+    }
+
+    static class BoundedGridFSSource<T> extends BoundedSource<T> {
+      @Nullable
+      private final String uri;
+      @Nullable
+      private final String database;
+      @Nullable
+      private final String bucket;
+      @Nullable
+      private final String filterJson;
+      @Nullable
+      private final ParseCallback<T> parser;
+      @Nullable
+      private final Coder<T> coder;
+      @Nullable
+      private List<ObjectId> objectIds;
+      private transient Mongo mongo;
+      private transient GridFS gridfs;
+
+      BoundedGridFSSource(String uri, String database, String bucket, String filterJson,
+          ParseCallback<T> parser, Coder<T> coder) {
+        this.uri = uri;
+        this.database = database;
+        this.bucket = bucket;
+        this.parser = parser;
+        this.coder = coder;
+        this.filterJson = filterJson;
+      }
+      BoundedGridFSSource(String uri, String database, String bucket, String filterJson,
+          ParseCallback<T> parser, Coder<T> coder, List<ObjectId> objectIds) {
+        this.uri = uri;
+        this.database = database;
+        this.bucket = bucket;
+        this.parser = parser;
+        this.coder = coder;
+        this.objectIds = objectIds;
+        this.filterJson = filterJson;
+      }
+      private synchronized void setupGridFS() {
+        if (gridfs == null) {
+          mongo = uri == null ? new Mongo() : new Mongo(new MongoURI(uri));
+          DB db = database == null ? mongo.getDB("gridfs") : mongo.getDB(database);
+          gridfs = bucket == null ? new GridFS(db) : new GridFS(db, bucket);
+        }
+      }
+      private synchronized void closeGridFS() {
+        if (gridfs != null) {
+          gridfs = null;
+          mongo.close();
+          mongo = null;
+        }
+      }
+
+      @Override
+      public List<? extends BoundedSource<T>> splitIntoBundles(long desiredBundleSizeBytes,
+          PipelineOptions options) throws Exception {
+        try {
+          setupGridFS();
+          DBCursor cursor;
+          if (filterJson != null) {
+            DBObject query = (DBObject) JSON.parse(filterJson);
+            cursor = gridfs.getFileList(query).sort(null);
+          } else {
+            cursor = gridfs.getFileList().sort(null);
+          }
+                    long size = 0;
+          List<BoundedGridFSSource<T>> list = new LinkedList<>();
+          List<ObjectId> objects = new LinkedList<>();
+          while (cursor.hasNext()) {
+            GridFSDBFile file = (GridFSDBFile) cursor.next();
+            long len = file.getLength();
+            if ((size + len) > desiredBundleSizeBytes && !objects.isEmpty()) {
+              list.add(new BoundedGridFSSource<T>(uri, database, bucket, filterJson,
+                                                  parser, coder, objects));
+              size = 0;
+              objects = new LinkedList<>();
+            }
+            objects.add((ObjectId) file.getId());
+            size += len;
+          }
+          if (!objects.isEmpty() || list.isEmpty()) {
+            list.add(new BoundedGridFSSource<T>(uri, database, bucket, filterJson,
+                                                parser, coder, objects));
+          }
+          return list;
+        } finally {
+          closeGridFS();
+        }
+      }
+
+      @Override
+      public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
+        try {
+          setupGridFS();
+          DBCursor cursor;
+          if (filterJson != null) {
+            DBObject query = (DBObject) JSON.parse(filterJson);
+            cursor = gridfs.getFileList(query).sort(null);
+          } else {
+            cursor = gridfs.getFileList().sort(null);
+          }
+          long size = 0;
+          while (cursor.hasNext()) {
+            GridFSDBFile file = (GridFSDBFile) cursor.next();
+            size += file.getLength();
+          }
+          return size;
+        } finally {
+          closeGridFS();
+        }
+      }
+
+      @Override
+      public boolean producesSortedKeys(PipelineOptions options) throws Exception {
+        return false;
+      }
+
+      @Override
+      public org.apache.beam.sdk.io.BoundedSource.BoundedReader<T> createReader(
+          PipelineOptions options) throws IOException {
+        return new GridFSReader(this);
+      }
+
+      @Override
+      public void validate() {
+      }
+
+      @Override
+      public void populateDisplayData(DisplayData.Builder builder) {
+        super.populateDisplayData(builder);
+        builder.addIfNotNull(DisplayData.item("uri", uri));
+        builder.addIfNotNull(DisplayData.item("database", database));
+        builder.addIfNotNull(DisplayData.item("bucket", bucket));
+        builder.addIfNotNull(DisplayData.item("filterJson", filterJson));
+      }
+
+      @SuppressWarnings("unchecked")
+      @Override
+      public Coder<T> getDefaultOutputCoder() {
+        if (coder != null) {
+          return coder;
+        }
+        return (Coder<T>) SerializableCoder.of(Serializable.class);
+      }
+
+      class GridFSReader extends org.apache.beam.sdk.io.BoundedSource.BoundedReader<T> {
+        final BoundedGridFSSource<T> source;
+
+        Instant timestamp = Instant.now();
+        Iterator<ParseCallback.Line<T>> currentIterator;
+        ParseCallback.Line<T> currentLine;
+
+        GridFSReader(BoundedGridFSSource<T> source) {
+          this.source = source;
+        }
+
+        @Override
+        public boolean start() throws IOException {
+          setupGridFS();
+          if (objectIds == null) {
+            objectIds = new LinkedList<>();
+            DBCursor cursor = gridfs.getFileList().sort(null);
+            while (cursor.hasNext()) {
+              DBObject ob = cursor.next();
+              objectIds.add((ObjectId) ob.get("_id"));
+            }
+          }
+          return advance();
+        }
+
+        @Override
+        public boolean advance() throws IOException {
+          if (currentIterator != null && !currentIterator.hasNext()) {
+            objectIds.remove(0);
+            currentIterator = null;
+          }
+          if (currentIterator == null) {
+            if (objectIds.isEmpty()) {
+              return false;
+            }
+            ObjectId oid = objectIds.get(0);
+            GridFSDBFile file = gridfs.find(oid);
+            if (file == null) {
+              return false;
+            }
+            timestamp = new Instant(file.getUploadDate().getTime());
+            currentIterator = parser.parse(file);
+          }
+
+          if (currentIterator.hasNext()) {
+            currentLine = currentIterator.next();
+            return true;
+          }
+          return false;
+        }
+
+        @Override
+        public BoundedSource<T> getCurrentSource() {
+          return source;
+        }
+
+        @Override
+        public T getCurrent() throws NoSuchElementException {
+          if (currentLine != null) {
+            return currentLine.value;
+          }
+          throw new NoSuchElementException();
+        }
+
+        @Override
+        public Instant getCurrentTimestamp() throws NoSuchElementException {
+          if (currentLine != null) {
+            if (currentLine.timestamp != null) {
+              return currentLine.timestamp;
+            }
+            return timestamp;
+          }
+          throw new NoSuchElementException();
+        }
+
+        @Override
+        public void close() throws IOException {
+          closeGridFS();
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/68c8c787/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java
new file mode 100644
index 0000000..f8e5f77
--- /dev/null
+++ b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.mongodb;
+
+import static org.junit.Assert.assertEquals;
+
+import com.mongodb.DB;
+import com.mongodb.Mongo;
+import com.mongodb.gridfs.GridFS;
+import com.mongodb.gridfs.GridFSDBFile;
+import com.mongodb.gridfs.GridFSInputFile;
+
+import de.flapdoodle.embed.mongo.MongodExecutable;
+import de.flapdoodle.embed.mongo.MongodStarter;
+import de.flapdoodle.embed.mongo.config.IMongodConfig;
+import de.flapdoodle.embed.mongo.config.MongoCmdOptionsBuilder;
+import de.flapdoodle.embed.mongo.config.MongodConfigBuilder;
+import de.flapdoodle.embed.mongo.config.Net;
+import de.flapdoodle.embed.mongo.config.Storage;
+import de.flapdoodle.embed.mongo.distribution.Version;
+import de.flapdoodle.embed.process.io.file.Files;
+import de.flapdoodle.embed.process.runtime.Network;
+
+import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.Random;
+import java.util.Scanner;
+
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.io.mongodb.MongoDbGridFSIO.ParseCallback;
+import org.apache.beam.sdk.testing.NeedsRunner;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.Max;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.joda.time.Instant;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Test on the MongoDbGridFSIO.
+ */
+public class MongoDBGridFSIOTest implements Serializable {
+  private static final Logger LOGGER = LoggerFactory.getLogger(MongoDBGridFSIOTest.class);
+
+  private static final String MONGODB_LOCATION = "target/mongodb";
+  private static final int PORT = 27017;
+  private static final String DATABASE = "gridfs";
+
+  private transient MongodExecutable mongodExecutable;
+
+  @Before
+  public void setup() throws Exception {
+    LOGGER.info("Starting MongoDB embedded instance");
+    try {
+      Files.forceDelete(new File(MONGODB_LOCATION));
+    } catch (Exception e) {
+
+    }
+    new File(MONGODB_LOCATION).mkdirs();
+    IMongodConfig mongodConfig = new MongodConfigBuilder()
+        .version(Version.Main.PRODUCTION)
+        .configServer(false)
+        .replication(new Storage(MONGODB_LOCATION, null, 0))
+        .net(new Net("localhost", PORT, Network.localhostIsIPv6()))
+        .cmdOptions(new MongoCmdOptionsBuilder()
+            .syncDelay(10)
+            .useNoPrealloc(true)
+            .useSmallFiles(true)
+            .useNoJournal(true)
+            .build())
+        .build();
+    mongodExecutable = MongodStarter.getDefaultInstance().prepare(mongodConfig);
+    mongodExecutable.start();
+
+    LOGGER.info("Insert test data");
+
+    Mongo client = new Mongo("localhost", PORT);
+    DB database = client.getDB(DATABASE);
+    GridFS gridfs = new GridFS(database);
+
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    for (int x = 0; x < 100; x++) {
+      out.write(("Einstein\nDarwin\nCopernicus\nPasteur\n"
+                  + "Curie\nFaraday\nNewton\nBohr\nGalilei\nMaxwell\n").getBytes());
+    }
+    for (int x = 0; x < 5; x++) {
+      gridfs.createFile(new ByteArrayInputStream(out.toByteArray()), "file" + x).save();
+    }
+
+    gridfs = new GridFS(database, "mapBucket");
+    long now = System.currentTimeMillis();
+    Random random = new Random();
+    String[] scientists = {"Einstein", "Darwin", "Copernicus", "Pasteur", "Curie", "Faraday",
+        "Newton", "Bohr", "Galilei", "Maxwell"};
+    for (int x = 0; x < 10; x++) {
+      GridFSInputFile file = gridfs.createFile("file_" + x);
+      OutputStream outf = file.getOutputStream();
+      OutputStreamWriter writer = new OutputStreamWriter(outf);
+      for (int y = 0; y < 5000; y++) {
+        long time = now - random.nextInt(3600000);
+        String name = scientists[y % scientists.length];
+        writer.write(Long.toString(time) + "\t");
+        writer.write(name + "\t");
+        writer.write(Integer.toString(random.nextInt(100)));
+        writer.write("\n");
+      }
+      for (int y = 0; y < scientists.length; y++) {
+        String name = scientists[y % scientists.length];
+        writer.write(Long.toString(now) + "\t");
+        writer.write(name + "\t");
+        writer.write("101");
+        writer.write("\n");
+      }
+      writer.flush();
+      writer.close();
+    }
+  }
+
+  @After
+  public void stop() throws Exception {
+    LOGGER.info("Stopping MongoDB instance");
+    mongodExecutable.stop();
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testFullRead() throws Exception {
+    TestPipeline pipeline = TestPipeline.create();
+
+    PCollection<String> output = pipeline.apply(
+        MongoDbGridFSIO.read()
+            .withUri("mongodb://localhost:" + PORT)
+            .withDatabase(DATABASE));
+
+    PAssert.thatSingleton(
+        output.apply("Count All", Count.<String>globally()))
+        .isEqualTo(5000L);
+
+    PAssert.that(
+      output.apply("Count PerElement", Count.<String>perElement()))
+      .satisfies(new SerializableFunction<Iterable<KV<String, Long>>, Void>() {
+      @Override
+      public Void apply(Iterable<KV<String, Long>> input) {
+        for (KV<String, Long> element : input) {
+          assertEquals(500L, element.getValue().longValue());
+        }
+        return null;
+      }
+    });
+
+    pipeline.run();
+  }
+
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testReadWithParser() throws Exception {
+    TestPipeline pipeline = TestPipeline.create();
+
+    PCollection<KV<String, Integer>> output = pipeline.apply(
+        MongoDbGridFSIO.read()
+            .withUri("mongodb://localhost:" + PORT)
+            .withDatabase(DATABASE)
+            .withBucket("mapBucket")
+            .withParsingFn(new ParseCallback<KV<String, Integer>>() {
+              @Override
+              public Iterator<MongoDbGridFSIO.ParseCallback.Line<KV<String, Integer>>> parse(
+                  GridFSDBFile input) throws IOException {
+                final BufferedReader reader =
+                    new BufferedReader(new InputStreamReader(input.getInputStream()));
+                return new Iterator<Line<KV<String, Integer>>>() {
+                  String line = reader.readLine();
+                  @Override
+                  public boolean hasNext() {
+                    return line != null;
+                  }
+                  @Override
+                  public MongoDbGridFSIO.ParseCallback.Line<KV<String, Integer>> next() {
+                    try (Scanner scanner = new Scanner(line.trim())) {
+                      scanner.useDelimiter("\\t");
+                      long timestamp = scanner.nextLong();
+                      String name = scanner.next();
+                      int score = scanner.nextInt();
+
+                      try {
+                        line = reader.readLine();
+                      } catch (IOException e) {
+                        line = null;
+                      }
+                      if (line == null) {
+                        try {
+                          reader.close();
+                        } catch (IOException e) {
+                          //ignore
+                        }
+                      }
+                      return new Line<>(KV.of(name, score), new Instant(timestamp));
+                    }
+                  }
+                  @Override
+                  public void remove() {
+                  }
+                };
+              }
+            })).setCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()));
+
+    PAssert.thatSingleton(output.apply("Count All", Count.<KV<String, Integer>>globally()))
+        .isEqualTo(50100L);
+
+    PAssert.that(output.apply("Max PerElement", Max.<String>integersPerKey()))
+      .satisfies(new SerializableFunction<Iterable<KV<String, Integer>>, Void>() {
+      @Override
+      public Void apply(Iterable<KV<String, Integer>> input) {
+        for (KV<String, Integer> element : input) {
+          assertEquals(101, element.getValue().longValue());
+        }
+        return null;
+      }
+    });
+    pipeline.run();
+  }
+
+}


[2/2] incubator-beam git commit: [BEAM-674] This closes #1003

Posted by jb...@apache.org.
[BEAM-674] This closes #1003


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/307d592d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/307d592d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/307d592d

Branch: refs/heads/master
Commit: 307d592d2fd3a6d7bd78fc6243292ff8045b3fdc
Parents: 3879db0 68c8c78
Author: Jean-Baptiste Onofr� <jb...@apache.org>
Authored: Wed Sep 28 17:54:52 2016 +0200
Committer: Jean-Baptiste Onofr� <jb...@apache.org>
Committed: Wed Sep 28 17:54:52 2016 +0200

----------------------------------------------------------------------
 sdks/java/io/mongodb/pom.xml                    |   6 +-
 .../beam/sdk/io/mongodb/MongoDbGridFSIO.java    | 427 +++++++++++++++++++
 .../sdk/io/mongodb/MongoDBGridFSIOTest.java     | 257 +++++++++++
 3 files changed, 689 insertions(+), 1 deletion(-)
----------------------------------------------------------------------