You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2016/10/02 13:43:37 UTC
[1/2] incubator-beam git commit: [BEAM-674] Refactoring and improvements on the MongoDB GridFS IO
Repository: incubator-beam
Updated Branches:
refs/heads/master c5c343659 -> 2e0adaf02
[BEAM-674]�Refactoring and improvements on the MongoDB GridFS IO
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/54854f86
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/54854f86
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/54854f86
Branch: refs/heads/master
Commit: 54854f86346f805008c0d459caf402dd0ad4e46d
Parents: c5c3436
Author: Daniel Kulp <dk...@apache.org>
Authored: Wed Sep 28 22:44:37 2016 -0400
Committer: Jean-Baptiste Onofr� <jb...@apache.org>
Committed: Sun Oct 2 15:11:39 2016 +0200
----------------------------------------------------------------------
.../beam/sdk/io/mongodb/MongoDbGridFSIO.java | 403 ++++++++++---------
.../sdk/io/mongodb/MongoDBGridFSIOTest.java | 100 +++--
2 files changed, 270 insertions(+), 233 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/54854f86/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java
index 337e5f5..cebda64 100644
--- a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java
+++ b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.io.mongodb;
+import static com.google.common.base.Preconditions.checkNotNull;
import com.mongodb.DB;
import com.mongodb.DBCursor;
import com.mongodb.DBObject;
@@ -30,8 +31,8 @@ import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Serializable;
+import java.util.ArrayList;
import java.util.Iterator;
-import java.util.LinkedList;
import java.util.List;
import java.util.NoSuchElementException;
@@ -42,11 +43,14 @@ import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.bson.types.ObjectId;
+import org.joda.time.Duration;
import org.joda.time.Instant;
@@ -55,10 +59,12 @@ import org.joda.time.Instant;
* <p>
* <h3>Reading from MongoDB via GridFS</h3>
* <p>
- * <p>MongoDbGridFSIO source returns a bounded collection of String as {@code PCollection<String>}.
+ * <p>MongoDbGridFSIO source returns a bounded collection of Objects as {@code PCollection<T>}.
* <p>
- * <p>To configure the MongoDB source, you have to provide the connection URI, the database name
- * and the bucket name. The following example illustrates various options for configuring the
+ * <p>To configure the MongoDB GridFS source, you can provide the connection URI, the database name
+ * and the bucket name. If unspecified, the default values from the GridFS driver are used.
+ *
+ * The following example illustrates various options for configuring the
* source:</p>
* <p>
* <pre>{@code
@@ -73,132 +79,172 @@ import org.joda.time.Instant;
* <p>The source also accepts an optional configuration: {@code withQueryFilter()} allows you to
* define a JSON filter to get subset of files in the database.</p>
*
- * <p>There is also an optional {@code ParseCallback} that can be specified that can be used to
+ * <p>There is also an optional {@code Parser} that can be specified that can be used to
* parse the InputStream into objects usable with Beam. By default, MongoDbGridFSIO will parse
* into Strings, splitting on line breaks and using the uploadDate of the file as the timestamp.
+ * When using a parser that outputs with custom timestamps, you may also need to specify
+ * the allowedTimestampSkew option.
*/
public class MongoDbGridFSIO {
/**
- * Function for parsing the GridFSDBFile into objects for the PCollection.
- * @param <T>
+ * Callback for the parser to use to submit data.
*/
- public interface ParseCallback<T> extends Serializable {
+ public interface ParserCallback<T> extends Serializable {
/**
- * Each value parsed from the file should be output as an
- * Iterable of Line<T>. If timestamp is omitted, it will
- * use the uploadDate of the GridFSDBFile.
+ * Output the object. The default timestamp will be the GridFSDBFile
+ * creation timestamp.
+ * @param output
*/
- public static class Line<T> {
- final Instant timestamp;
- final T value;
+ public void output(T output);
- public Line(T value, Instant timestamp) {
- this.value = value;
- this.timestamp = timestamp;
- }
- public Line(T value) {
- this.value = value;
- this.timestamp = null;
- }
- };
- public Iterator<Line<T>> parse(GridFSDBFile input) throws IOException;
+ /**
+ * Output the object using the specified timestamp.
+ * @param output
+ * @param timestamp
+ */
+ public void output(T output, Instant timestamp);
}
/**
- * Default implementation for parsing the InputStream to collection of
- * strings splitting on the cr/lf.
+ * Interface for the parser that is used to parse the GridFSDBFile into
+ * the appropriate types.
+ * @param <T>
*/
- private static class StringsParseCallback implements ParseCallback<String> {
- static final StringsParseCallback INSTANCE = new StringsParseCallback();
+ public interface Parser<T> extends Serializable {
+ public void parse(GridFSDBFile input, ParserCallback<T> callback) throws IOException;
+ }
+ /**
+ * For the default {@code Read<String>} case, this is the parser that is used to
+ * split the input file into Strings. It uses the timestamp of the file
+ * for the event timestamp.
+ */
+ private static final Parser<String> TEXT_PARSER = new Parser<String>() {
@Override
- public Iterator<Line<String>> parse(final GridFSDBFile input) throws IOException {
- final BufferedReader reader =
- new BufferedReader(new InputStreamReader(input.getInputStream()));
- return new Iterator<Line<String>>() {
- String val = reader.readLine();
- @Override
- public boolean hasNext() {
- return val != null;
- }
-
- @Override
- public Line<String> next() {
- Line<String> l = new Line<String>(val);
- try {
- val = reader.readLine();
- } catch (IOException e) {
- val = null;
- }
- return l;
+ public void parse(GridFSDBFile input, ParserCallback<String> callback)
+ throws IOException {
+ final Instant time = new Instant(input.getUploadDate().getTime());
+ try (BufferedReader reader =
+ new BufferedReader(new InputStreamReader(input.getInputStream()))) {
+ for (String line = reader.readLine(); line != null; line = reader.readLine()) {
+ callback.output(line, time);
}
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException("Remove not supported");
- }
- };
+ }
}
- }
+ };
/** Read data from GridFS. */
public static Read<String> read() {
- return new Read<String>(new Read.BoundedGridFSSource<String>(null, null, null, null,
- StringsParseCallback.INSTANCE, StringUtf8Coder.of()));
+ return new Read<String>(new Read.BoundedGridFSSource(null, null, null, null,
+ null), TEXT_PARSER, StringUtf8Coder.of(), Duration.ZERO);
}
static class Read<T> extends PTransform<PBegin, PCollection<T>> {
public Read<T> withUri(String uri) {
- return new Read<T>(new BoundedGridFSSource<T>(uri, options.database,
+ return new Read<T>(new BoundedGridFSSource(uri, options.database,
options.bucket, options.filterJson,
- options.parser, options.coder));
+ null), parser, coder, allowedTimestampSkew);
}
public Read<T> withDatabase(String database) {
- return new Read<T>(new BoundedGridFSSource<T>(options.uri, database,
+ return new Read<T>(new BoundedGridFSSource(options.uri, database,
options.bucket, options.filterJson,
- options.parser, options.coder));
+ null), parser, coder, allowedTimestampSkew);
}
public Read<T> withBucket(String bucket) {
- return new Read<T>(new BoundedGridFSSource<T>(options.uri, options.database, bucket,
- options.filterJson, options.parser, options.coder));
+ return new Read<T>(new BoundedGridFSSource(options.uri, options.database, bucket,
+ options.filterJson, null), parser, coder, allowedTimestampSkew);
}
- public <X> Read<X> withParsingFn(ParseCallback<X> f) {
- return new Read<X>(new BoundedGridFSSource<X>(options.uri, options.database,
- options.bucket, options.filterJson, f, null));
+ public <X> Read<X> withParser(Parser<X> f) {
+ return withParser(f, null);
}
-
- public Read<T> withCoder(Coder<T> coder) {
- return new Read<T>(new BoundedGridFSSource<T>(options.uri, options.database,
- options.bucket, options.filterJson, options.parser, coder));
+ public <X> Read<X> withParser(Parser<X> f, Coder<X> coder) {
+ checkNotNull(f, "Parser cannot be null");
+ //coder can be null as it can be set on the output directly
+ return new Read<X>(new BoundedGridFSSource(options.uri, options.database,
+ options.bucket, options.filterJson, null), f, coder, allowedTimestampSkew);
+ }
+ public Read<T> allowedTimestampSkew(Duration skew) {
+ return new Read<T>(new BoundedGridFSSource(options.uri, options.database,
+ options.bucket, options.filterJson, null),
+ parser, coder, skew == null ? Duration.ZERO : skew);
}
public Read<T> withQueryFilter(String filterJson) {
- return new Read<T>(new BoundedGridFSSource<T>(options.uri, options.database,
- options.bucket, filterJson, options.parser, options.coder));
+ return new Read<T>(new BoundedGridFSSource(options.uri, options.database,
+ options.bucket, filterJson, null), parser, coder, allowedTimestampSkew);
}
- private final BoundedGridFSSource<T> options;
+ private final BoundedGridFSSource options;
+ private final Parser<T> parser;
+ private final Coder<T> coder;
+ private final Duration allowedTimestampSkew;
- Read(BoundedGridFSSource<T> options) {
+ Read(BoundedGridFSSource options, Parser<T> parser,
+ Coder<T> coder, Duration allowedTimestampSkew) {
this.options = options;
+ this.parser = parser;
+ this.allowedTimestampSkew = allowedTimestampSkew;
+ this.coder = coder;
+ }
+
+ BoundedGridFSSource getSource() {
+ return options;
}
@Override
public PCollection<T> apply(PBegin input) {
- org.apache.beam.sdk.io.Read.Bounded<T> unbounded =
+ org.apache.beam.sdk.io.Read.Bounded<ObjectId> objectIds =
org.apache.beam.sdk.io.Read.from(options);
- PCollection<T> output = input.getPipeline().apply(unbounded);
- if (options.coder != null) {
- output.setCoder(options.coder);
+ PCollection<T> output = input.getPipeline().apply(objectIds)
+ .apply(ParDo.of(new DoFn<ObjectId, T>() {
+ Mongo mongo;
+ GridFS gridfs;
+
+ @Setup
+ public void setup() {
+ mongo = options.setupMongo();
+ gridfs = options.setupGridFS(mongo);
+ }
+
+ @Teardown
+ public void teardown() {
+ mongo.close();
+ }
+
+ @ProcessElement
+ public void processElement(final ProcessContext c) throws IOException {
+ ObjectId oid = c.element();
+ GridFSDBFile file = gridfs.find(oid);
+ parser.parse(file, new ParserCallback<T>() {
+ @Override
+ public void output(T output, Instant timestamp) {
+ checkNotNull(timestamp);
+ c.outputWithTimestamp(output, timestamp);
+ }
+
+ @Override
+ public void output(T output) {
+ c.output(output);
+ }
+ });
+ }
+
+ @Override
+ public Duration getAllowedTimestampSkew() {
+ return allowedTimestampSkew;
+ }
+ }));
+ if (coder != null) {
+ output.setCoder(coder);
}
return output;
}
- static class BoundedGridFSSource<T> extends BoundedSource<T> {
+ static class BoundedGridFSSource extends BoundedSource<ObjectId> {
@Nullable
private final String uri;
@Nullable
@@ -208,96 +254,75 @@ public class MongoDbGridFSIO {
@Nullable
private final String filterJson;
@Nullable
- private final ParseCallback<T> parser;
- @Nullable
- private final Coder<T> coder;
- @Nullable
private List<ObjectId> objectIds;
- private transient Mongo mongo;
- private transient GridFS gridfs;
- BoundedGridFSSource(String uri, String database, String bucket, String filterJson,
- ParseCallback<T> parser, Coder<T> coder) {
+ BoundedGridFSSource(String uri, String database,
+ String bucket, String filterJson,
+ List<ObjectId> objectIds) {
this.uri = uri;
this.database = database;
this.bucket = bucket;
- this.parser = parser;
- this.coder = coder;
- this.filterJson = filterJson;
- }
- BoundedGridFSSource(String uri, String database, String bucket, String filterJson,
- ParseCallback<T> parser, Coder<T> coder, List<ObjectId> objectIds) {
- this.uri = uri;
- this.database = database;
- this.bucket = bucket;
- this.parser = parser;
- this.coder = coder;
this.objectIds = objectIds;
this.filterJson = filterJson;
}
- private synchronized void setupGridFS() {
- if (gridfs == null) {
- mongo = uri == null ? new Mongo() : new Mongo(new MongoURI(uri));
- DB db = database == null ? mongo.getDB("gridfs") : mongo.getDB(database);
- gridfs = bucket == null ? new GridFS(db) : new GridFS(db, bucket);
- }
+
+ private Mongo setupMongo() {
+ return uri == null ? new Mongo() : new Mongo(new MongoURI(uri));
}
- private synchronized void closeGridFS() {
- if (gridfs != null) {
- gridfs = null;
- mongo.close();
- mongo = null;
+
+ private GridFS setupGridFS(Mongo mongo) {
+ DB db = database == null ? mongo.getDB("gridfs") : mongo.getDB(database);
+ return bucket == null ? new GridFS(db) : new GridFS(db, bucket);
+ }
+
+ private DBCursor createCursor(GridFS gridfs) {
+ if (filterJson != null) {
+ DBObject query = (DBObject) JSON.parse(filterJson);
+ return gridfs.getFileList(query).sort(null);
}
+ return gridfs.getFileList().sort(null);
}
@Override
- public List<? extends BoundedSource<T>> splitIntoBundles(long desiredBundleSizeBytes,
+ public List<? extends BoundedSource<ObjectId>> splitIntoBundles(long desiredBundleSizeBytes,
PipelineOptions options) throws Exception {
+ Mongo mongo = setupMongo();
try {
- setupGridFS();
- DBCursor cursor;
- if (filterJson != null) {
- DBObject query = (DBObject) JSON.parse(filterJson);
- cursor = gridfs.getFileList(query).sort(null);
- } else {
- cursor = gridfs.getFileList().sort(null);
- }
- long size = 0;
- List<BoundedGridFSSource<T>> list = new LinkedList<>();
- List<ObjectId> objects = new LinkedList<>();
+ GridFS gridfs = setupGridFS(mongo);
+ DBCursor cursor = createCursor(gridfs);
+ long size = 0;
+ List<BoundedGridFSSource> list = new ArrayList<>();
+ List<ObjectId> objects = new ArrayList<>();
while (cursor.hasNext()) {
GridFSDBFile file = (GridFSDBFile) cursor.next();
long len = file.getLength();
if ((size + len) > desiredBundleSizeBytes && !objects.isEmpty()) {
- list.add(new BoundedGridFSSource<T>(uri, database, bucket, filterJson,
- parser, coder, objects));
+ list.add(new BoundedGridFSSource(uri, database, bucket,
+ filterJson,
+ objects));
size = 0;
- objects = new LinkedList<>();
+ objects = new ArrayList<>();
}
objects.add((ObjectId) file.getId());
size += len;
}
if (!objects.isEmpty() || list.isEmpty()) {
- list.add(new BoundedGridFSSource<T>(uri, database, bucket, filterJson,
- parser, coder, objects));
+ list.add(new BoundedGridFSSource(uri, database, bucket,
+ filterJson,
+ objects));
}
return list;
} finally {
- closeGridFS();
+ mongo.close();
}
}
@Override
public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
+ Mongo mongo = setupMongo();
try {
- setupGridFS();
- DBCursor cursor;
- if (filterJson != null) {
- DBObject query = (DBObject) JSON.parse(filterJson);
- cursor = gridfs.getFileList(query).sort(null);
- } else {
- cursor = gridfs.getFileList().sort(null);
- }
+ GridFS gridfs = setupGridFS(mongo);
+ DBCursor cursor = createCursor(gridfs);
long size = 0;
while (cursor.hasNext()) {
GridFSDBFile file = (GridFSDBFile) cursor.next();
@@ -305,7 +330,7 @@ public class MongoDbGridFSIO {
}
return size;
} finally {
- closeGridFS();
+ mongo.close();
}
}
@@ -315,9 +340,9 @@ public class MongoDbGridFSIO {
}
@Override
- public org.apache.beam.sdk.io.BoundedSource.BoundedReader<T> createReader(
+ public BoundedSource.BoundedReader<ObjectId> createReader(
PipelineOptions options) throws IOException {
- return new GridFSReader(this);
+ return new GridFSReader(this, objectIds);
}
@Override
@@ -333,93 +358,85 @@ public class MongoDbGridFSIO {
builder.addIfNotNull(DisplayData.item("filterJson", filterJson));
}
- @SuppressWarnings("unchecked")
@Override
- public Coder<T> getDefaultOutputCoder() {
- if (coder != null) {
- return coder;
- }
- return (Coder<T>) SerializableCoder.of(Serializable.class);
+ public Coder<ObjectId> getDefaultOutputCoder() {
+ return SerializableCoder.of(ObjectId.class);
}
- class GridFSReader extends org.apache.beam.sdk.io.BoundedSource.BoundedReader<T> {
- final BoundedGridFSSource<T> source;
+ static class GridFSReader extends BoundedSource.BoundedReader<ObjectId> {
+ final BoundedGridFSSource source;
+
+ /* When split into bundles, this records the ObjectId's of the files for
+ * this bundle. Otherwise, this is null. When null, a DBCursor of the
+ * files is used directly to avoid having the ObjectId's queried and
+ * loaded ahead of time saving time and memory.
+ */
+ @Nullable
+ final List<ObjectId> objects;
- Instant timestamp = Instant.now();
- Iterator<ParseCallback.Line<T>> currentIterator;
- ParseCallback.Line<T> currentLine;
+ Mongo mongo;
+ DBCursor cursor;
+ Iterator<ObjectId> iterator;
+ ObjectId current;
- GridFSReader(BoundedGridFSSource<T> source) {
+ GridFSReader(BoundedGridFSSource source, List<ObjectId> objects) {
this.source = source;
+ this.objects = objects;
+ }
+
+ @Override
+ public BoundedSource<ObjectId> getCurrentSource() {
+ return source;
}
@Override
public boolean start() throws IOException {
- setupGridFS();
- if (objectIds == null) {
- objectIds = new LinkedList<>();
- DBCursor cursor = gridfs.getFileList().sort(null);
- while (cursor.hasNext()) {
- DBObject ob = cursor.next();
- objectIds.add((ObjectId) ob.get("_id"));
- }
+ if (objects == null) {
+ mongo = source.setupMongo();
+ GridFS gridfs = source.setupGridFS(mongo);
+ cursor = source.createCursor(gridfs);
+ } else {
+ iterator = objects.iterator();
}
return advance();
}
@Override
public boolean advance() throws IOException {
- if (currentIterator != null && !currentIterator.hasNext()) {
- objectIds.remove(0);
- currentIterator = null;
- }
- if (currentIterator == null) {
- if (objectIds.isEmpty()) {
- return false;
- }
- ObjectId oid = objectIds.get(0);
- GridFSDBFile file = gridfs.find(oid);
- if (file == null) {
- return false;
- }
- timestamp = new Instant(file.getUploadDate().getTime());
- currentIterator = parser.parse(file);
- }
-
- if (currentIterator.hasNext()) {
- currentLine = currentIterator.next();
+ if (iterator != null && iterator.hasNext()) {
+ current = iterator.next();
+ return true;
+ } else if (cursor != null && cursor.hasNext()) {
+ GridFSDBFile file = (GridFSDBFile) cursor.next();
+ current = (ObjectId) file.getId();
return true;
}
+ current = null;
return false;
}
@Override
- public BoundedSource<T> getCurrentSource() {
- return source;
- }
-
- @Override
- public T getCurrent() throws NoSuchElementException {
- if (currentLine != null) {
- return currentLine.value;
+ public ObjectId getCurrent() throws NoSuchElementException {
+ if (current == null) {
+ throw new NoSuchElementException();
}
- throw new NoSuchElementException();
+ return current;
}
- @Override
public Instant getCurrentTimestamp() throws NoSuchElementException {
- if (currentLine != null) {
- if (currentLine.timestamp != null) {
- return currentLine.timestamp;
- }
- return timestamp;
+ if (current == null) {
+ throw new NoSuchElementException();
}
- throw new NoSuchElementException();
+ long time = current.getTimestamp();
+ time *= 1000L;
+ return new Instant(time);
}
@Override
public void close() throws IOException {
- closeGridFS();
+ if (mongo != null) {
+ mongo.close();
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/54854f86/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java
index f8e5f77..e5e5c8f 100644
--- a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java
+++ b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java
@@ -45,25 +45,31 @@ import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.Serializable;
-import java.util.Iterator;
+import java.util.List;
import java.util.Random;
import java.util.Scanner;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
-import org.apache.beam.sdk.io.mongodb.MongoDbGridFSIO.ParseCallback;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.mongodb.MongoDbGridFSIO.Read.BoundedGridFSSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.SourceTestUtils;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Max;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
+import org.bson.types.ObjectId;
+import org.joda.time.Duration;
import org.joda.time.Instant;
-import org.junit.After;
-import org.junit.Before;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
@@ -80,10 +86,10 @@ public class MongoDBGridFSIOTest implements Serializable {
private static final int PORT = 27017;
private static final String DATABASE = "gridfs";
- private transient MongodExecutable mongodExecutable;
+ private static transient MongodExecutable mongodExecutable;
- @Before
- public void setup() throws Exception {
+ @BeforeClass
+ public static void setup() throws Exception {
LOGGER.info("Starting MongoDB embedded instance");
try {
Files.forceDelete(new File(MONGODB_LOCATION));
@@ -148,10 +154,11 @@ public class MongoDBGridFSIOTest implements Serializable {
writer.flush();
writer.close();
}
+ client.close();
}
- @After
- public void stop() throws Exception {
+ @AfterClass
+ public static void stop() throws Exception {
LOGGER.info("Stopping MongoDB instance");
mongodExecutable.stop();
}
@@ -196,47 +203,28 @@ public class MongoDBGridFSIOTest implements Serializable {
.withUri("mongodb://localhost:" + PORT)
.withDatabase(DATABASE)
.withBucket("mapBucket")
- .withParsingFn(new ParseCallback<KV<String, Integer>>() {
+ .withParser(new MongoDbGridFSIO.Parser<KV<String, Integer>>() {
@Override
- public Iterator<MongoDbGridFSIO.ParseCallback.Line<KV<String, Integer>>> parse(
- GridFSDBFile input) throws IOException {
- final BufferedReader reader =
- new BufferedReader(new InputStreamReader(input.getInputStream()));
- return new Iterator<Line<KV<String, Integer>>>() {
+ public void parse(GridFSDBFile input,
+ MongoDbGridFSIO.ParserCallback<KV<String, Integer>> callback) throws IOException {
+ try (final BufferedReader reader =
+ new BufferedReader(new InputStreamReader(input.getInputStream()))) {
String line = reader.readLine();
- @Override
- public boolean hasNext() {
- return line != null;
- }
- @Override
- public MongoDbGridFSIO.ParseCallback.Line<KV<String, Integer>> next() {
+ while (line != null) {
try (Scanner scanner = new Scanner(line.trim())) {
scanner.useDelimiter("\\t");
long timestamp = scanner.nextLong();
String name = scanner.next();
int score = scanner.nextInt();
-
- try {
- line = reader.readLine();
- } catch (IOException e) {
- line = null;
- }
- if (line == null) {
- try {
- reader.close();
- } catch (IOException e) {
- //ignore
- }
- }
- return new Line<>(KV.of(name, score), new Instant(timestamp));
+ callback.output(KV.of(name, score), new Instant(timestamp));
}
+ line = reader.readLine();
}
- @Override
- public void remove() {
- }
- };
+ }
}
- })).setCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()));
+ })
+ .allowedTimestampSkew(new Duration(3601000L)))
+ .setCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()));
PAssert.thatSingleton(output.apply("Count All", Count.<KV<String, Integer>>globally()))
.isEqualTo(50100L);
@@ -251,7 +239,39 @@ public class MongoDBGridFSIOTest implements Serializable {
return null;
}
});
+
pipeline.run();
}
+ @Test
+ public void testSplit() throws Exception {
+ PipelineOptions options = PipelineOptionsFactory.create();
+ MongoDbGridFSIO.Read<String> read = MongoDbGridFSIO.read()
+ .withUri("mongodb://localhost:" + PORT)
+ .withDatabase(DATABASE);
+
+ BoundedGridFSSource src = read.getSource();
+
+ // make sure 2 files can fit in
+ long desiredBundleSizeBytes = (src.getEstimatedSizeBytes(options) * 2L) / 5L + 1000;
+ List<? extends BoundedSource<ObjectId>> splits = src.splitIntoBundles(
+ desiredBundleSizeBytes, options);
+
+ int expectedNbSplits = 3;
+ assertEquals(expectedNbSplits, splits.size());
+ SourceTestUtils.
+ assertSourcesEqualReferenceSource(src, splits, options);
+ int nonEmptySplits = 0;
+ int count = 0;
+ for (BoundedSource<ObjectId> subSource : splits) {
+ List<ObjectId> result = SourceTestUtils.readFromSource(subSource, options);
+ if (result.size() > 0) {
+ nonEmptySplits += 1;
+ }
+ count += result.size();
+ }
+ assertEquals(expectedNbSplits, nonEmptySplits);
+ assertEquals(5, count);
+ }
+
}
[2/2] incubator-beam git commit: [BEAM-674] This closes #1025
Posted by jb...@apache.org.
[BEAM-674] This closes #1025
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2e0adaf0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2e0adaf0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2e0adaf0
Branch: refs/heads/master
Commit: 2e0adaf0223adde897cd1b2134014db673474fe8
Parents: c5c3436 54854f8
Author: Jean-Baptiste Onofr� <jb...@apache.org>
Authored: Sun Oct 2 15:12:42 2016 +0200
Committer: Jean-Baptiste Onofr� <jb...@apache.org>
Committed: Sun Oct 2 15:12:42 2016 +0200
----------------------------------------------------------------------
.../beam/sdk/io/mongodb/MongoDbGridFSIO.java | 403 ++++++++++---------
.../sdk/io/mongodb/MongoDBGridFSIOTest.java | 100 +++--
2 files changed, 270 insertions(+), 233 deletions(-)
----------------------------------------------------------------------