You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2016/10/02 13:43:37 UTC

[1/2] incubator-beam git commit: [BEAM-674] Refactoring and improvements on the MongoDB GridFS IO

Repository: incubator-beam
Updated Branches:
  refs/heads/master c5c343659 -> 2e0adaf02


[BEAM-674]�Refactoring and improvements on the MongoDB GridFS IO


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/54854f86
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/54854f86
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/54854f86

Branch: refs/heads/master
Commit: 54854f86346f805008c0d459caf402dd0ad4e46d
Parents: c5c3436
Author: Daniel Kulp <dk...@apache.org>
Authored: Wed Sep 28 22:44:37 2016 -0400
Committer: Jean-Baptiste Onofr� <jb...@apache.org>
Committed: Sun Oct 2 15:11:39 2016 +0200

----------------------------------------------------------------------
 .../beam/sdk/io/mongodb/MongoDbGridFSIO.java    | 403 ++++++++++---------
 .../sdk/io/mongodb/MongoDBGridFSIOTest.java     | 100 +++--
 2 files changed, 270 insertions(+), 233 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/54854f86/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java
index 337e5f5..cebda64 100644
--- a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java
+++ b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.io.mongodb;
 
+import static com.google.common.base.Preconditions.checkNotNull;
 import com.mongodb.DB;
 import com.mongodb.DBCursor;
 import com.mongodb.DBObject;
@@ -30,8 +31,8 @@ import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.NoSuchElementException;
 
@@ -42,11 +43,14 @@ import org.apache.beam.sdk.coders.SerializableCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.bson.types.ObjectId;
+import org.joda.time.Duration;
 import org.joda.time.Instant;
 
 
@@ -55,10 +59,12 @@ import org.joda.time.Instant;
  * <p>
  * <h3>Reading from MongoDB via GridFS</h3>
  * <p>
- * <p>MongoDbGridFSIO source returns a bounded collection of String as {@code PCollection<String>}.
+ * <p>MongoDbGridFSIO source returns a bounded collection of Objects as {@code PCollection<T>}.
  * <p>
- * <p>To configure the MongoDB source, you have to provide the connection URI, the database name
- * and the bucket name. The following example illustrates various options for configuring the
+ * <p>To configure the MongoDB GridFS source, you can provide the connection URI, the database name
+ * and the bucket name.  If unspecified, the default values from the GridFS driver are used.
+ *
+ * The following example illustrates various options for configuring the
  * source:</p>
  * <p>
  * <pre>{@code
@@ -73,132 +79,172 @@ import org.joda.time.Instant;
  * <p>The source also accepts an optional configuration: {@code withQueryFilter()} allows you to
  * define a JSON filter to get subset of files in the database.</p>
  *
- * <p>There is also an optional {@code ParseCallback} that can be specified that can be used to
+ * <p>There is also an optional {@code Parser} that can be specified that can be used to
  * parse the InputStream into objects usable with Beam.  By default, MongoDbGridFSIO will parse
  * into Strings, splitting on line breaks and using the uploadDate of the file as the timestamp.
+ * When using a parser that outputs with custom timestamps, you may also need to specify
+ * the allowedTimestampSkew option.
  */
 public class MongoDbGridFSIO {
 
   /**
-   * Function for parsing the GridFSDBFile into objects for the PCollection.
-   * @param <T>
+   * Callback for the parser to use to submit data.
    */
-  public interface ParseCallback<T> extends Serializable {
+  public interface ParserCallback<T> extends Serializable {
     /**
-     * Each value parsed from the file should be output as an
-     * Iterable of Line&lt;T&gt;.  If timestamp is omitted, it will
-     * use the uploadDate of the GridFSDBFile.
+     * Output the object.  The default timestamp will be the GridFSDBFile
+     * creation timestamp.
+     * @param output
      */
-    public static class Line<T> {
-      final Instant timestamp;
-      final T value;
+    public void output(T output);
 
-      public Line(T value, Instant timestamp) {
-        this.value = value;
-        this.timestamp = timestamp;
-      }
-      public Line(T value) {
-        this.value = value;
-        this.timestamp = null;
-      }
-    };
-    public Iterator<Line<T>> parse(GridFSDBFile input) throws IOException;
+    /**
+     * Output the object using the specified timestamp.
+     * @param output
+     * @param timestamp
+     */
+    public void output(T output, Instant timestamp);
   }
 
   /**
-   * Default implementation for parsing the InputStream to collection of
-   * strings splitting on the cr/lf.
+   * Interface for the parser that is used to parse the GridFSDBFile into
+   * the appropriate types.
+   * @param <T>
    */
-  private static class StringsParseCallback implements ParseCallback<String> {
-    static final StringsParseCallback INSTANCE = new StringsParseCallback();
+  public interface Parser<T> extends Serializable {
+    public void parse(GridFSDBFile input, ParserCallback<T> callback) throws IOException;
+  }
 
+  /**
+   * For the default {@code Read<String>} case, this is the parser that is used to
+   * split the input file into Strings. It uses the timestamp of the file
+   * for the event timestamp.
+   */
+  private static final Parser<String> TEXT_PARSER = new Parser<String>() {
     @Override
-    public Iterator<Line<String>> parse(final GridFSDBFile input) throws IOException {
-      final BufferedReader reader =
-          new BufferedReader(new InputStreamReader(input.getInputStream()));
-      return new Iterator<Line<String>>() {
-        String val = reader.readLine();
-        @Override
-        public boolean hasNext() {
-          return val != null;
-        }
-
-        @Override
-        public Line<String> next() {
-          Line<String> l = new Line<String>(val);
-          try {
-            val = reader.readLine();
-          } catch (IOException e) {
-            val = null;
-          }
-          return l;
+    public void parse(GridFSDBFile input, ParserCallback<String> callback)
+        throws IOException {
+      final Instant time = new Instant(input.getUploadDate().getTime());
+      try (BufferedReader reader =
+        new BufferedReader(new InputStreamReader(input.getInputStream()))) {
+        for (String line = reader.readLine(); line != null; line = reader.readLine()) {
+          callback.output(line, time);
         }
-
-        @Override
-        public void remove() {
-          throw new UnsupportedOperationException("Remove not supported");
-        }
-      };
+      }
     }
-  }
+  };
 
   /** Read data from GridFS. */
   public static Read<String> read() {
-    return new Read<String>(new Read.BoundedGridFSSource<String>(null, null, null, null,
-                            StringsParseCallback.INSTANCE, StringUtf8Coder.of()));
+    return new Read<String>(new Read.BoundedGridFSSource(null, null, null, null,
+                            null), TEXT_PARSER, StringUtf8Coder.of(), Duration.ZERO);
   }
 
   static class Read<T> extends PTransform<PBegin, PCollection<T>> {
     public Read<T> withUri(String uri) {
-      return new Read<T>(new BoundedGridFSSource<T>(uri, options.database,
+      return new Read<T>(new BoundedGridFSSource(uri, options.database,
                                            options.bucket, options.filterJson,
-                                           options.parser, options.coder));
+                                           null), parser, coder, allowedTimestampSkew);
     }
 
     public Read<T> withDatabase(String database) {
-      return new Read<T>(new BoundedGridFSSource<T>(options.uri, database,
+      return new Read<T>(new BoundedGridFSSource(options.uri, database,
                                            options.bucket, options.filterJson,
-                                           options.parser, options.coder));
+                                           null), parser, coder, allowedTimestampSkew);
     }
 
     public Read<T> withBucket(String bucket) {
-      return new Read<T>(new BoundedGridFSSource<T>(options.uri, options.database, bucket,
-          options.filterJson, options.parser, options.coder));
+      return new Read<T>(new BoundedGridFSSource(options.uri, options.database, bucket,
+          options.filterJson, null), parser, coder, allowedTimestampSkew);
     }
 
-    public <X> Read<X> withParsingFn(ParseCallback<X> f) {
-      return new Read<X>(new BoundedGridFSSource<X>(options.uri, options.database,
-          options.bucket, options.filterJson, f, null));
+    public <X> Read<X> withParser(Parser<X> f) {
+      return withParser(f, null);
     }
-
-    public Read<T> withCoder(Coder<T> coder) {
-      return new Read<T>(new BoundedGridFSSource<T>(options.uri, options.database,
-          options.bucket, options.filterJson, options.parser, coder));
+    public <X> Read<X> withParser(Parser<X> f, Coder<X> coder) {
+      checkNotNull(f, "Parser cannot be null");
+      //coder can be null as it can be set on the output directly
+      return new Read<X>(new BoundedGridFSSource(options.uri, options.database,
+          options.bucket, options.filterJson, null), f, coder, allowedTimestampSkew);
+    }
+    public Read<T> allowedTimestampSkew(Duration skew) {
+      return new Read<T>(new BoundedGridFSSource(options.uri, options.database,
+          options.bucket, options.filterJson, null),
+          parser, coder, skew == null ? Duration.ZERO : skew);
     }
 
     public Read<T> withQueryFilter(String filterJson) {
-      return new Read<T>(new BoundedGridFSSource<T>(options.uri, options.database,
-          options.bucket, filterJson, options.parser, options.coder));
+      return new Read<T>(new BoundedGridFSSource(options.uri, options.database,
+          options.bucket, filterJson, null), parser, coder, allowedTimestampSkew);
     }
 
-    private final BoundedGridFSSource<T> options;
+    private final BoundedGridFSSource options;
+    private final Parser<T> parser;
+    private final Coder<T> coder;
+    private final Duration allowedTimestampSkew;
 
-    Read(BoundedGridFSSource<T> options) {
+    Read(BoundedGridFSSource options, Parser<T> parser,
+        Coder<T> coder, Duration allowedTimestampSkew) {
       this.options = options;
+      this.parser = parser;
+      this.allowedTimestampSkew = allowedTimestampSkew;
+      this.coder = coder;
+    }
+
+    BoundedGridFSSource getSource() {
+      return options;
     }
 
     @Override
     public PCollection<T> apply(PBegin input) {
-      org.apache.beam.sdk.io.Read.Bounded<T> unbounded =
+      org.apache.beam.sdk.io.Read.Bounded<ObjectId> objectIds =
           org.apache.beam.sdk.io.Read.from(options);
-      PCollection<T> output = input.getPipeline().apply(unbounded);
-      if (options.coder != null) {
-        output.setCoder(options.coder);
+      PCollection<T> output = input.getPipeline().apply(objectIds)
+          .apply(ParDo.of(new DoFn<ObjectId, T>() {
+            Mongo mongo;
+            GridFS gridfs;
+
+            @Setup
+            public void setup() {
+              mongo = options.setupMongo();
+              gridfs = options.setupGridFS(mongo);
+            }
+
+            @Teardown
+            public void teardown() {
+              mongo.close();
+            }
+
+            @ProcessElement
+            public void processElement(final ProcessContext c) throws IOException {
+              ObjectId oid = c.element();
+              GridFSDBFile file = gridfs.find(oid);
+              parser.parse(file, new ParserCallback<T>() {
+                @Override
+                public void output(T output, Instant timestamp) {
+                  checkNotNull(timestamp);
+                  c.outputWithTimestamp(output, timestamp);
+                }
+
+                @Override
+                public void output(T output) {
+                  c.output(output);
+                }
+              });
+            }
+
+            @Override
+            public Duration getAllowedTimestampSkew() {
+              return allowedTimestampSkew;
+            }
+          }));
+      if (coder != null) {
+        output.setCoder(coder);
       }
       return output;
     }
 
-    static class BoundedGridFSSource<T> extends BoundedSource<T> {
+    static class BoundedGridFSSource extends BoundedSource<ObjectId> {
       @Nullable
       private final String uri;
       @Nullable
@@ -208,96 +254,75 @@ public class MongoDbGridFSIO {
       @Nullable
       private final String filterJson;
       @Nullable
-      private final ParseCallback<T> parser;
-      @Nullable
-      private final Coder<T> coder;
-      @Nullable
       private List<ObjectId> objectIds;
-      private transient Mongo mongo;
-      private transient GridFS gridfs;
 
-      BoundedGridFSSource(String uri, String database, String bucket, String filterJson,
-          ParseCallback<T> parser, Coder<T> coder) {
+      BoundedGridFSSource(String uri, String database,
+                          String bucket, String filterJson,
+                          List<ObjectId> objectIds) {
         this.uri = uri;
         this.database = database;
         this.bucket = bucket;
-        this.parser = parser;
-        this.coder = coder;
-        this.filterJson = filterJson;
-      }
-      BoundedGridFSSource(String uri, String database, String bucket, String filterJson,
-          ParseCallback<T> parser, Coder<T> coder, List<ObjectId> objectIds) {
-        this.uri = uri;
-        this.database = database;
-        this.bucket = bucket;
-        this.parser = parser;
-        this.coder = coder;
         this.objectIds = objectIds;
         this.filterJson = filterJson;
       }
-      private synchronized void setupGridFS() {
-        if (gridfs == null) {
-          mongo = uri == null ? new Mongo() : new Mongo(new MongoURI(uri));
-          DB db = database == null ? mongo.getDB("gridfs") : mongo.getDB(database);
-          gridfs = bucket == null ? new GridFS(db) : new GridFS(db, bucket);
-        }
+
+      private Mongo setupMongo() {
+        return uri == null ? new Mongo() : new Mongo(new MongoURI(uri));
       }
-      private synchronized void closeGridFS() {
-        if (gridfs != null) {
-          gridfs = null;
-          mongo.close();
-          mongo = null;
+
+      private GridFS setupGridFS(Mongo mongo) {
+        DB db = database == null ? mongo.getDB("gridfs") : mongo.getDB(database);
+        return bucket == null ? new GridFS(db) : new GridFS(db, bucket);
+      }
+
+      private DBCursor createCursor(GridFS gridfs) {
+        if (filterJson != null) {
+          DBObject query = (DBObject) JSON.parse(filterJson);
+          return gridfs.getFileList(query).sort(null);
         }
+        return gridfs.getFileList().sort(null);
       }
 
       @Override
-      public List<? extends BoundedSource<T>> splitIntoBundles(long desiredBundleSizeBytes,
+      public List<? extends BoundedSource<ObjectId>> splitIntoBundles(long desiredBundleSizeBytes,
           PipelineOptions options) throws Exception {
+        Mongo mongo = setupMongo();
         try {
-          setupGridFS();
-          DBCursor cursor;
-          if (filterJson != null) {
-            DBObject query = (DBObject) JSON.parse(filterJson);
-            cursor = gridfs.getFileList(query).sort(null);
-          } else {
-            cursor = gridfs.getFileList().sort(null);
-          }
-                    long size = 0;
-          List<BoundedGridFSSource<T>> list = new LinkedList<>();
-          List<ObjectId> objects = new LinkedList<>();
+          GridFS gridfs = setupGridFS(mongo);
+          DBCursor cursor = createCursor(gridfs);
+          long size = 0;
+          List<BoundedGridFSSource> list = new ArrayList<>();
+          List<ObjectId> objects = new ArrayList<>();
           while (cursor.hasNext()) {
             GridFSDBFile file = (GridFSDBFile) cursor.next();
             long len = file.getLength();
             if ((size + len) > desiredBundleSizeBytes && !objects.isEmpty()) {
-              list.add(new BoundedGridFSSource<T>(uri, database, bucket, filterJson,
-                                                  parser, coder, objects));
+              list.add(new BoundedGridFSSource(uri, database, bucket,
+                                                 filterJson,
+                                                 objects));
               size = 0;
-              objects = new LinkedList<>();
+              objects = new ArrayList<>();
             }
             objects.add((ObjectId) file.getId());
             size += len;
           }
           if (!objects.isEmpty() || list.isEmpty()) {
-            list.add(new BoundedGridFSSource<T>(uri, database, bucket, filterJson,
-                                                parser, coder, objects));
+            list.add(new BoundedGridFSSource(uri, database, bucket,
+                                             filterJson,
+                                             objects));
           }
           return list;
         } finally {
-          closeGridFS();
+          mongo.close();
         }
       }
 
       @Override
       public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
+        Mongo mongo = setupMongo();
         try {
-          setupGridFS();
-          DBCursor cursor;
-          if (filterJson != null) {
-            DBObject query = (DBObject) JSON.parse(filterJson);
-            cursor = gridfs.getFileList(query).sort(null);
-          } else {
-            cursor = gridfs.getFileList().sort(null);
-          }
+          GridFS gridfs = setupGridFS(mongo);
+          DBCursor cursor = createCursor(gridfs);
           long size = 0;
           while (cursor.hasNext()) {
             GridFSDBFile file = (GridFSDBFile) cursor.next();
@@ -305,7 +330,7 @@ public class MongoDbGridFSIO {
           }
           return size;
         } finally {
-          closeGridFS();
+          mongo.close();
         }
       }
 
@@ -315,9 +340,9 @@ public class MongoDbGridFSIO {
       }
 
       @Override
-      public org.apache.beam.sdk.io.BoundedSource.BoundedReader<T> createReader(
+      public BoundedSource.BoundedReader<ObjectId> createReader(
           PipelineOptions options) throws IOException {
-        return new GridFSReader(this);
+        return new GridFSReader(this, objectIds);
       }
 
       @Override
@@ -333,93 +358,85 @@ public class MongoDbGridFSIO {
         builder.addIfNotNull(DisplayData.item("filterJson", filterJson));
       }
 
-      @SuppressWarnings("unchecked")
       @Override
-      public Coder<T> getDefaultOutputCoder() {
-        if (coder != null) {
-          return coder;
-        }
-        return (Coder<T>) SerializableCoder.of(Serializable.class);
+      public Coder<ObjectId> getDefaultOutputCoder() {
+        return SerializableCoder.of(ObjectId.class);
       }
 
-      class GridFSReader extends org.apache.beam.sdk.io.BoundedSource.BoundedReader<T> {
-        final BoundedGridFSSource<T> source;
+      static class GridFSReader extends BoundedSource.BoundedReader<ObjectId> {
+        final BoundedGridFSSource source;
+
+        /* When split into bundles, this records the ObjectId's of the files for
+         * this bundle.  Otherwise, this is null.  When null, a DBCursor of the
+         * files is used directly to avoid having the ObjectId's queried and
+         * loaded ahead of time saving time and memory.
+         */
+        @Nullable
+        final List<ObjectId> objects;
 
-        Instant timestamp = Instant.now();
-        Iterator<ParseCallback.Line<T>> currentIterator;
-        ParseCallback.Line<T> currentLine;
+        Mongo mongo;
+        DBCursor cursor;
+        Iterator<ObjectId> iterator;
+        ObjectId current;
 
-        GridFSReader(BoundedGridFSSource<T> source) {
+        GridFSReader(BoundedGridFSSource source, List<ObjectId> objects) {
           this.source = source;
+          this.objects = objects;
+        }
+
+        @Override
+        public BoundedSource<ObjectId> getCurrentSource() {
+          return source;
         }
 
         @Override
         public boolean start() throws IOException {
-          setupGridFS();
-          if (objectIds == null) {
-            objectIds = new LinkedList<>();
-            DBCursor cursor = gridfs.getFileList().sort(null);
-            while (cursor.hasNext()) {
-              DBObject ob = cursor.next();
-              objectIds.add((ObjectId) ob.get("_id"));
-            }
+          if (objects == null) {
+            mongo = source.setupMongo();
+            GridFS gridfs = source.setupGridFS(mongo);
+            cursor = source.createCursor(gridfs);
+          } else {
+            iterator = objects.iterator();
           }
           return advance();
         }
 
         @Override
         public boolean advance() throws IOException {
-          if (currentIterator != null && !currentIterator.hasNext()) {
-            objectIds.remove(0);
-            currentIterator = null;
-          }
-          if (currentIterator == null) {
-            if (objectIds.isEmpty()) {
-              return false;
-            }
-            ObjectId oid = objectIds.get(0);
-            GridFSDBFile file = gridfs.find(oid);
-            if (file == null) {
-              return false;
-            }
-            timestamp = new Instant(file.getUploadDate().getTime());
-            currentIterator = parser.parse(file);
-          }
-
-          if (currentIterator.hasNext()) {
-            currentLine = currentIterator.next();
+          if (iterator != null && iterator.hasNext()) {
+            current = iterator.next();
+            return true;
+          } else if (cursor != null && cursor.hasNext()) {
+            GridFSDBFile file = (GridFSDBFile) cursor.next();
+            current = (ObjectId) file.getId();
             return true;
           }
+          current = null;
           return false;
         }
 
         @Override
-        public BoundedSource<T> getCurrentSource() {
-          return source;
-        }
-
-        @Override
-        public T getCurrent() throws NoSuchElementException {
-          if (currentLine != null) {
-            return currentLine.value;
+        public ObjectId getCurrent() throws NoSuchElementException {
+          if (current == null) {
+            throw new NoSuchElementException();
           }
-          throw new NoSuchElementException();
+          return current;
         }
 
-        @Override
         public Instant getCurrentTimestamp() throws NoSuchElementException {
-          if (currentLine != null) {
-            if (currentLine.timestamp != null) {
-              return currentLine.timestamp;
-            }
-            return timestamp;
+          if (current == null) {
+            throw new NoSuchElementException();
           }
-          throw new NoSuchElementException();
+          long time = current.getTimestamp();
+          time *= 1000L;
+          return new Instant(time);
         }
 
         @Override
         public void close() throws IOException {
-          closeGridFS();
+          if (mongo != null) {
+            mongo.close();
+          }
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/54854f86/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java
index f8e5f77..e5e5c8f 100644
--- a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java
+++ b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java
@@ -45,25 +45,31 @@ import java.io.InputStreamReader;
 import java.io.OutputStream;
 import java.io.OutputStreamWriter;
 import java.io.Serializable;
-import java.util.Iterator;
+import java.util.List;
 import java.util.Random;
 import java.util.Scanner;
 
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
-import org.apache.beam.sdk.io.mongodb.MongoDbGridFSIO.ParseCallback;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.mongodb.MongoDbGridFSIO.Read.BoundedGridFSSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.SourceTestUtils;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.Max;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
+import org.bson.types.ObjectId;
+import org.joda.time.Duration;
 import org.joda.time.Instant;
-import org.junit.After;
-import org.junit.Before;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.slf4j.Logger;
@@ -80,10 +86,10 @@ public class MongoDBGridFSIOTest implements Serializable {
   private static final int PORT = 27017;
   private static final String DATABASE = "gridfs";
 
-  private transient MongodExecutable mongodExecutable;
+  private static transient MongodExecutable mongodExecutable;
 
-  @Before
-  public void setup() throws Exception {
+  @BeforeClass
+  public static void setup() throws Exception {
     LOGGER.info("Starting MongoDB embedded instance");
     try {
       Files.forceDelete(new File(MONGODB_LOCATION));
@@ -148,10 +154,11 @@ public class MongoDBGridFSIOTest implements Serializable {
       writer.flush();
       writer.close();
     }
+    client.close();
   }
 
-  @After
-  public void stop() throws Exception {
+  @AfterClass
+  public static void stop() throws Exception {
     LOGGER.info("Stopping MongoDB instance");
     mongodExecutable.stop();
   }
@@ -196,47 +203,28 @@ public class MongoDBGridFSIOTest implements Serializable {
             .withUri("mongodb://localhost:" + PORT)
             .withDatabase(DATABASE)
             .withBucket("mapBucket")
-            .withParsingFn(new ParseCallback<KV<String, Integer>>() {
+            .withParser(new MongoDbGridFSIO.Parser<KV<String, Integer>>() {
               @Override
-              public Iterator<MongoDbGridFSIO.ParseCallback.Line<KV<String, Integer>>> parse(
-                  GridFSDBFile input) throws IOException {
-                final BufferedReader reader =
-                    new BufferedReader(new InputStreamReader(input.getInputStream()));
-                return new Iterator<Line<KV<String, Integer>>>() {
+              public void parse(GridFSDBFile input,
+                  MongoDbGridFSIO.ParserCallback<KV<String, Integer>> callback) throws IOException {
+                try (final BufferedReader reader =
+                    new BufferedReader(new InputStreamReader(input.getInputStream()))) {
                   String line = reader.readLine();
-                  @Override
-                  public boolean hasNext() {
-                    return line != null;
-                  }
-                  @Override
-                  public MongoDbGridFSIO.ParseCallback.Line<KV<String, Integer>> next() {
+                  while (line != null) {
                     try (Scanner scanner = new Scanner(line.trim())) {
                       scanner.useDelimiter("\\t");
                       long timestamp = scanner.nextLong();
                       String name = scanner.next();
                       int score = scanner.nextInt();
-
-                      try {
-                        line = reader.readLine();
-                      } catch (IOException e) {
-                        line = null;
-                      }
-                      if (line == null) {
-                        try {
-                          reader.close();
-                        } catch (IOException e) {
-                          //ignore
-                        }
-                      }
-                      return new Line<>(KV.of(name, score), new Instant(timestamp));
+                      callback.output(KV.of(name, score), new Instant(timestamp));
                     }
+                    line = reader.readLine();
                   }
-                  @Override
-                  public void remove() {
-                  }
-                };
+                }
               }
-            })).setCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()));
+            })
+            .allowedTimestampSkew(new Duration(3601000L)))
+            .setCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()));
 
     PAssert.thatSingleton(output.apply("Count All", Count.<KV<String, Integer>>globally()))
         .isEqualTo(50100L);
@@ -251,7 +239,39 @@ public class MongoDBGridFSIOTest implements Serializable {
         return null;
       }
     });
+
     pipeline.run();
   }
 
+  @Test
+  public void testSplit() throws Exception {
+    PipelineOptions options = PipelineOptionsFactory.create();
+    MongoDbGridFSIO.Read<String> read = MongoDbGridFSIO.read()
+        .withUri("mongodb://localhost:" + PORT)
+        .withDatabase(DATABASE);
+
+    BoundedGridFSSource src = read.getSource();
+
+    // make sure 2 files can fit in
+    long desiredBundleSizeBytes = (src.getEstimatedSizeBytes(options) * 2L) / 5L + 1000;
+    List<? extends BoundedSource<ObjectId>> splits = src.splitIntoBundles(
+        desiredBundleSizeBytes, options);
+
+    int expectedNbSplits = 3;
+    assertEquals(expectedNbSplits, splits.size());
+    SourceTestUtils.
+      assertSourcesEqualReferenceSource(src, splits, options);
+    int nonEmptySplits = 0;
+    int count = 0;
+    for (BoundedSource<ObjectId> subSource : splits) {
+      List<ObjectId> result = SourceTestUtils.readFromSource(subSource, options);
+      if (result.size() > 0) {
+        nonEmptySplits += 1;
+      }
+      count += result.size();
+    }
+    assertEquals(expectedNbSplits, nonEmptySplits);
+    assertEquals(5, count);
+  }
+
 }


[2/2] incubator-beam git commit: [BEAM-674] This closes #1025

Posted by jb...@apache.org.
[BEAM-674] This closes #1025


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2e0adaf0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2e0adaf0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2e0adaf0

Branch: refs/heads/master
Commit: 2e0adaf0223adde897cd1b2134014db673474fe8
Parents: c5c3436 54854f8
Author: Jean-Baptiste Onofr� <jb...@apache.org>
Authored: Sun Oct 2 15:12:42 2016 +0200
Committer: Jean-Baptiste Onofr� <jb...@apache.org>
Committed: Sun Oct 2 15:12:42 2016 +0200

----------------------------------------------------------------------
 .../beam/sdk/io/mongodb/MongoDbGridFSIO.java    | 403 ++++++++++---------
 .../sdk/io/mongodb/MongoDBGridFSIOTest.java     | 100 +++--
 2 files changed, 270 insertions(+), 233 deletions(-)
----------------------------------------------------------------------