You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2016/09/28 15:55:07 UTC
[1/2] incubator-beam git commit: [BEAM-674] Add GridFS support to
MongoDbIO
Repository: incubator-beam
Updated Branches:
refs/heads/master 3879db036 -> 307d592d2
[BEAM-674] Add GridFS support to MongoDbIO
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/68c8c787
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/68c8c787
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/68c8c787
Branch: refs/heads/master
Commit: 68c8c7872720f4e8fbcd017032c0e90e395e905c
Parents: 3879db0
Author: Daniel Kulp <dk...@apache.org>
Authored: Fri Sep 16 16:58:56 2016 -0400
Committer: Jean-Baptiste Onofr� <jb...@apache.org>
Committed: Wed Sep 28 17:18:46 2016 +0200
----------------------------------------------------------------------
sdks/java/io/mongodb/pom.xml | 6 +-
.../beam/sdk/io/mongodb/MongoDbGridFSIO.java | 427 +++++++++++++++++++
.../sdk/io/mongodb/MongoDBGridFSIOTest.java | 257 +++++++++++
3 files changed, 689 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/68c8c787/sdks/java/io/mongodb/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/mongodb/pom.xml b/sdks/java/io/mongodb/pom.xml
index 60f1d1e..b7e36af 100644
--- a/sdks/java/io/mongodb/pom.xml
+++ b/sdks/java/io/mongodb/pom.xml
@@ -89,6 +89,10 @@
<artifactId>mongo-java-driver</artifactId>
<version>${mongo-java-driver.version}</version>
</dependency>
+ <dependency>
+ <groupId>joda-time</groupId>
+ <artifactId>joda-time</artifactId>
+ </dependency>
<!-- test dependencies -->
<dependency>
@@ -126,4 +130,4 @@
</dependency>
</dependencies>
-</project>
\ No newline at end of file
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/68c8c787/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java
new file mode 100644
index 0000000..337e5f5
--- /dev/null
+++ b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java
@@ -0,0 +1,427 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.mongodb;
+
+import com.mongodb.DB;
+import com.mongodb.DBCursor;
+import com.mongodb.DBObject;
+import com.mongodb.Mongo;
+import com.mongodb.MongoURI;
+import com.mongodb.gridfs.GridFS;
+import com.mongodb.gridfs.GridFSDBFile;
+import com.mongodb.util.JSON;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import javax.annotation.Nullable;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.bson.types.ObjectId;
+import org.joda.time.Instant;
+
+
+/**
+ * IO to read and write data on MongoDB GridFS.
+ * <p>
+ * <h3>Reading from MongoDB via GridFS</h3>
+ * <p>
+ * <p>MongoDbGridFSIO source returns a bounded collection of String as {@code PCollection<String>}.
+ * <p>
+ * <p>To configure the MongoDB source, you have to provide the connection URI, the database name
+ * and the bucket name. The following example illustrates various options for configuring the
+ * source:</p>
+ * <p>
+ * <pre>{@code
+ *
+ * pipeline.apply(MongoDbGridFSIO.read()
+ * .withUri("mongodb://localhost:27017")
+ * .withDatabase("my-database")
+ * .withBucket("my-bucket"))
+ *
+ * }</pre>
+ *
+ * <p>The source also accepts an optional configuration: {@code withQueryFilter()} allows you to
+ * define a JSON filter to get subset of files in the database.</p>
+ *
+ * <p>There is also an optional {@code ParseCallback} that can be specified that can be used to
+ * parse the InputStream into objects usable with Beam. By default, MongoDbGridFSIO will parse
+ * into Strings, splitting on line breaks and using the uploadDate of the file as the timestamp.
+ */
+public class MongoDbGridFSIO {
+
+ /**
+ * Function for parsing the GridFSDBFile into objects for the PCollection.
+ * @param <T>
+ */
+ public interface ParseCallback<T> extends Serializable {
+ /**
+ * Each value parsed from the file should be output as an
+ * Iterable of Line<T>. If timestamp is omitted, it will
+ * use the uploadDate of the GridFSDBFile.
+ */
+ public static class Line<T> {
+ final Instant timestamp;
+ final T value;
+
+ public Line(T value, Instant timestamp) {
+ this.value = value;
+ this.timestamp = timestamp;
+ }
+ public Line(T value) {
+ this.value = value;
+ this.timestamp = null;
+ }
+ };
+ public Iterator<Line<T>> parse(GridFSDBFile input) throws IOException;
+ }
+
+ /**
+ * Default implementation for parsing the InputStream to collection of
+ * strings splitting on the cr/lf.
+ */
+ private static class StringsParseCallback implements ParseCallback<String> {
+ static final StringsParseCallback INSTANCE = new StringsParseCallback();
+
+ @Override
+ public Iterator<Line<String>> parse(final GridFSDBFile input) throws IOException {
+ final BufferedReader reader =
+ new BufferedReader(new InputStreamReader(input.getInputStream()));
+ return new Iterator<Line<String>>() {
+ String val = reader.readLine();
+ @Override
+ public boolean hasNext() {
+ return val != null;
+ }
+
+ @Override
+ public Line<String> next() {
+ Line<String> l = new Line<String>(val);
+ try {
+ val = reader.readLine();
+ } catch (IOException e) {
+ val = null;
+ }
+ return l;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("Remove not supported");
+ }
+ };
+ }
+ }
+
+ /** Read data from GridFS. */
+ public static Read<String> read() {
+ return new Read<String>(new Read.BoundedGridFSSource<String>(null, null, null, null,
+ StringsParseCallback.INSTANCE, StringUtf8Coder.of()));
+ }
+
+ static class Read<T> extends PTransform<PBegin, PCollection<T>> {
+ public Read<T> withUri(String uri) {
+ return new Read<T>(new BoundedGridFSSource<T>(uri, options.database,
+ options.bucket, options.filterJson,
+ options.parser, options.coder));
+ }
+
+ public Read<T> withDatabase(String database) {
+ return new Read<T>(new BoundedGridFSSource<T>(options.uri, database,
+ options.bucket, options.filterJson,
+ options.parser, options.coder));
+ }
+
+ public Read<T> withBucket(String bucket) {
+ return new Read<T>(new BoundedGridFSSource<T>(options.uri, options.database, bucket,
+ options.filterJson, options.parser, options.coder));
+ }
+
+ public <X> Read<X> withParsingFn(ParseCallback<X> f) {
+ return new Read<X>(new BoundedGridFSSource<X>(options.uri, options.database,
+ options.bucket, options.filterJson, f, null));
+ }
+
+ public Read<T> withCoder(Coder<T> coder) {
+ return new Read<T>(new BoundedGridFSSource<T>(options.uri, options.database,
+ options.bucket, options.filterJson, options.parser, coder));
+ }
+
+ public Read<T> withQueryFilter(String filterJson) {
+ return new Read<T>(new BoundedGridFSSource<T>(options.uri, options.database,
+ options.bucket, filterJson, options.parser, options.coder));
+ }
+
+ private final BoundedGridFSSource<T> options;
+
+ Read(BoundedGridFSSource<T> options) {
+ this.options = options;
+ }
+
+ @Override
+ public PCollection<T> apply(PBegin input) {
+ org.apache.beam.sdk.io.Read.Bounded<T> unbounded =
+ org.apache.beam.sdk.io.Read.from(options);
+ PCollection<T> output = input.getPipeline().apply(unbounded);
+ if (options.coder != null) {
+ output.setCoder(options.coder);
+ }
+ return output;
+ }
+
+ static class BoundedGridFSSource<T> extends BoundedSource<T> {
+ @Nullable
+ private final String uri;
+ @Nullable
+ private final String database;
+ @Nullable
+ private final String bucket;
+ @Nullable
+ private final String filterJson;
+ @Nullable
+ private final ParseCallback<T> parser;
+ @Nullable
+ private final Coder<T> coder;
+ @Nullable
+ private List<ObjectId> objectIds;
+ private transient Mongo mongo;
+ private transient GridFS gridfs;
+
+ BoundedGridFSSource(String uri, String database, String bucket, String filterJson,
+ ParseCallback<T> parser, Coder<T> coder) {
+ this.uri = uri;
+ this.database = database;
+ this.bucket = bucket;
+ this.parser = parser;
+ this.coder = coder;
+ this.filterJson = filterJson;
+ }
+ BoundedGridFSSource(String uri, String database, String bucket, String filterJson,
+ ParseCallback<T> parser, Coder<T> coder, List<ObjectId> objectIds) {
+ this.uri = uri;
+ this.database = database;
+ this.bucket = bucket;
+ this.parser = parser;
+ this.coder = coder;
+ this.objectIds = objectIds;
+ this.filterJson = filterJson;
+ }
+ private synchronized void setupGridFS() {
+ if (gridfs == null) {
+ mongo = uri == null ? new Mongo() : new Mongo(new MongoURI(uri));
+ DB db = database == null ? mongo.getDB("gridfs") : mongo.getDB(database);
+ gridfs = bucket == null ? new GridFS(db) : new GridFS(db, bucket);
+ }
+ }
+ private synchronized void closeGridFS() {
+ if (gridfs != null) {
+ gridfs = null;
+ mongo.close();
+ mongo = null;
+ }
+ }
+
+ @Override
+ public List<? extends BoundedSource<T>> splitIntoBundles(long desiredBundleSizeBytes,
+ PipelineOptions options) throws Exception {
+ try {
+ setupGridFS();
+ DBCursor cursor;
+ if (filterJson != null) {
+ DBObject query = (DBObject) JSON.parse(filterJson);
+ cursor = gridfs.getFileList(query).sort(null);
+ } else {
+ cursor = gridfs.getFileList().sort(null);
+ }
+ long size = 0;
+ List<BoundedGridFSSource<T>> list = new LinkedList<>();
+ List<ObjectId> objects = new LinkedList<>();
+ while (cursor.hasNext()) {
+ GridFSDBFile file = (GridFSDBFile) cursor.next();
+ long len = file.getLength();
+ if ((size + len) > desiredBundleSizeBytes && !objects.isEmpty()) {
+ list.add(new BoundedGridFSSource<T>(uri, database, bucket, filterJson,
+ parser, coder, objects));
+ size = 0;
+ objects = new LinkedList<>();
+ }
+ objects.add((ObjectId) file.getId());
+ size += len;
+ }
+ if (!objects.isEmpty() || list.isEmpty()) {
+ list.add(new BoundedGridFSSource<T>(uri, database, bucket, filterJson,
+ parser, coder, objects));
+ }
+ return list;
+ } finally {
+ closeGridFS();
+ }
+ }
+
+ @Override
+ public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
+ try {
+ setupGridFS();
+ DBCursor cursor;
+ if (filterJson != null) {
+ DBObject query = (DBObject) JSON.parse(filterJson);
+ cursor = gridfs.getFileList(query).sort(null);
+ } else {
+ cursor = gridfs.getFileList().sort(null);
+ }
+ long size = 0;
+ while (cursor.hasNext()) {
+ GridFSDBFile file = (GridFSDBFile) cursor.next();
+ size += file.getLength();
+ }
+ return size;
+ } finally {
+ closeGridFS();
+ }
+ }
+
+ @Override
+ public boolean producesSortedKeys(PipelineOptions options) throws Exception {
+ return false;
+ }
+
+ @Override
+ public org.apache.beam.sdk.io.BoundedSource.BoundedReader<T> createReader(
+ PipelineOptions options) throws IOException {
+ return new GridFSReader(this);
+ }
+
+ @Override
+ public void validate() {
+ }
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
+ builder.addIfNotNull(DisplayData.item("uri", uri));
+ builder.addIfNotNull(DisplayData.item("database", database));
+ builder.addIfNotNull(DisplayData.item("bucket", bucket));
+ builder.addIfNotNull(DisplayData.item("filterJson", filterJson));
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Coder<T> getDefaultOutputCoder() {
+ if (coder != null) {
+ return coder;
+ }
+ return (Coder<T>) SerializableCoder.of(Serializable.class);
+ }
+
+ class GridFSReader extends org.apache.beam.sdk.io.BoundedSource.BoundedReader<T> {
+ final BoundedGridFSSource<T> source;
+
+ Instant timestamp = Instant.now();
+ Iterator<ParseCallback.Line<T>> currentIterator;
+ ParseCallback.Line<T> currentLine;
+
+ GridFSReader(BoundedGridFSSource<T> source) {
+ this.source = source;
+ }
+
+ @Override
+ public boolean start() throws IOException {
+ setupGridFS();
+ if (objectIds == null) {
+ objectIds = new LinkedList<>();
+ DBCursor cursor = gridfs.getFileList().sort(null);
+ while (cursor.hasNext()) {
+ DBObject ob = cursor.next();
+ objectIds.add((ObjectId) ob.get("_id"));
+ }
+ }
+ return advance();
+ }
+
+ @Override
+ public boolean advance() throws IOException {
+ if (currentIterator != null && !currentIterator.hasNext()) {
+ objectIds.remove(0);
+ currentIterator = null;
+ }
+ if (currentIterator == null) {
+ if (objectIds.isEmpty()) {
+ return false;
+ }
+ ObjectId oid = objectIds.get(0);
+ GridFSDBFile file = gridfs.find(oid);
+ if (file == null) {
+ return false;
+ }
+ timestamp = new Instant(file.getUploadDate().getTime());
+ currentIterator = parser.parse(file);
+ }
+
+ if (currentIterator.hasNext()) {
+ currentLine = currentIterator.next();
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public BoundedSource<T> getCurrentSource() {
+ return source;
+ }
+
+ @Override
+ public T getCurrent() throws NoSuchElementException {
+ if (currentLine != null) {
+ return currentLine.value;
+ }
+ throw new NoSuchElementException();
+ }
+
+ @Override
+ public Instant getCurrentTimestamp() throws NoSuchElementException {
+ if (currentLine != null) {
+ if (currentLine.timestamp != null) {
+ return currentLine.timestamp;
+ }
+ return timestamp;
+ }
+ throw new NoSuchElementException();
+ }
+
+ @Override
+ public void close() throws IOException {
+ closeGridFS();
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/68c8c787/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java
new file mode 100644
index 0000000..f8e5f77
--- /dev/null
+++ b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.mongodb;
+
+import static org.junit.Assert.assertEquals;
+
+import com.mongodb.DB;
+import com.mongodb.Mongo;
+import com.mongodb.gridfs.GridFS;
+import com.mongodb.gridfs.GridFSDBFile;
+import com.mongodb.gridfs.GridFSInputFile;
+
+import de.flapdoodle.embed.mongo.MongodExecutable;
+import de.flapdoodle.embed.mongo.MongodStarter;
+import de.flapdoodle.embed.mongo.config.IMongodConfig;
+import de.flapdoodle.embed.mongo.config.MongoCmdOptionsBuilder;
+import de.flapdoodle.embed.mongo.config.MongodConfigBuilder;
+import de.flapdoodle.embed.mongo.config.Net;
+import de.flapdoodle.embed.mongo.config.Storage;
+import de.flapdoodle.embed.mongo.distribution.Version;
+import de.flapdoodle.embed.process.io.file.Files;
+import de.flapdoodle.embed.process.runtime.Network;
+
+import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.Random;
+import java.util.Scanner;
+
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.io.mongodb.MongoDbGridFSIO.ParseCallback;
+import org.apache.beam.sdk.testing.NeedsRunner;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.Max;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.joda.time.Instant;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Test on the MongoDbGridFSIO.
+ */
+public class MongoDBGridFSIOTest implements Serializable {
+ private static final Logger LOGGER = LoggerFactory.getLogger(MongoDBGridFSIOTest.class);
+
+ private static final String MONGODB_LOCATION = "target/mongodb";
+ private static final int PORT = 27017;
+ private static final String DATABASE = "gridfs";
+
+ private transient MongodExecutable mongodExecutable;
+
+ @Before
+ public void setup() throws Exception {
+ LOGGER.info("Starting MongoDB embedded instance");
+ try {
+ Files.forceDelete(new File(MONGODB_LOCATION));
+ } catch (Exception e) {
+
+ }
+ new File(MONGODB_LOCATION).mkdirs();
+ IMongodConfig mongodConfig = new MongodConfigBuilder()
+ .version(Version.Main.PRODUCTION)
+ .configServer(false)
+ .replication(new Storage(MONGODB_LOCATION, null, 0))
+ .net(new Net("localhost", PORT, Network.localhostIsIPv6()))
+ .cmdOptions(new MongoCmdOptionsBuilder()
+ .syncDelay(10)
+ .useNoPrealloc(true)
+ .useSmallFiles(true)
+ .useNoJournal(true)
+ .build())
+ .build();
+ mongodExecutable = MongodStarter.getDefaultInstance().prepare(mongodConfig);
+ mongodExecutable.start();
+
+ LOGGER.info("Insert test data");
+
+ Mongo client = new Mongo("localhost", PORT);
+ DB database = client.getDB(DATABASE);
+ GridFS gridfs = new GridFS(database);
+
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ for (int x = 0; x < 100; x++) {
+ out.write(("Einstein\nDarwin\nCopernicus\nPasteur\n"
+ + "Curie\nFaraday\nNewton\nBohr\nGalilei\nMaxwell\n").getBytes());
+ }
+ for (int x = 0; x < 5; x++) {
+ gridfs.createFile(new ByteArrayInputStream(out.toByteArray()), "file" + x).save();
+ }
+
+ gridfs = new GridFS(database, "mapBucket");
+ long now = System.currentTimeMillis();
+ Random random = new Random();
+ String[] scientists = {"Einstein", "Darwin", "Copernicus", "Pasteur", "Curie", "Faraday",
+ "Newton", "Bohr", "Galilei", "Maxwell"};
+ for (int x = 0; x < 10; x++) {
+ GridFSInputFile file = gridfs.createFile("file_" + x);
+ OutputStream outf = file.getOutputStream();
+ OutputStreamWriter writer = new OutputStreamWriter(outf);
+ for (int y = 0; y < 5000; y++) {
+ long time = now - random.nextInt(3600000);
+ String name = scientists[y % scientists.length];
+ writer.write(Long.toString(time) + "\t");
+ writer.write(name + "\t");
+ writer.write(Integer.toString(random.nextInt(100)));
+ writer.write("\n");
+ }
+ for (int y = 0; y < scientists.length; y++) {
+ String name = scientists[y % scientists.length];
+ writer.write(Long.toString(now) + "\t");
+ writer.write(name + "\t");
+ writer.write("101");
+ writer.write("\n");
+ }
+ writer.flush();
+ writer.close();
+ }
+ }
+
+ @After
+ public void stop() throws Exception {
+ LOGGER.info("Stopping MongoDB instance");
+ mongodExecutable.stop();
+ }
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void testFullRead() throws Exception {
+ TestPipeline pipeline = TestPipeline.create();
+
+ PCollection<String> output = pipeline.apply(
+ MongoDbGridFSIO.read()
+ .withUri("mongodb://localhost:" + PORT)
+ .withDatabase(DATABASE));
+
+ PAssert.thatSingleton(
+ output.apply("Count All", Count.<String>globally()))
+ .isEqualTo(5000L);
+
+ PAssert.that(
+ output.apply("Count PerElement", Count.<String>perElement()))
+ .satisfies(new SerializableFunction<Iterable<KV<String, Long>>, Void>() {
+ @Override
+ public Void apply(Iterable<KV<String, Long>> input) {
+ for (KV<String, Long> element : input) {
+ assertEquals(500L, element.getValue().longValue());
+ }
+ return null;
+ }
+ });
+
+ pipeline.run();
+ }
+
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void testReadWithParser() throws Exception {
+ TestPipeline pipeline = TestPipeline.create();
+
+ PCollection<KV<String, Integer>> output = pipeline.apply(
+ MongoDbGridFSIO.read()
+ .withUri("mongodb://localhost:" + PORT)
+ .withDatabase(DATABASE)
+ .withBucket("mapBucket")
+ .withParsingFn(new ParseCallback<KV<String, Integer>>() {
+ @Override
+ public Iterator<MongoDbGridFSIO.ParseCallback.Line<KV<String, Integer>>> parse(
+ GridFSDBFile input) throws IOException {
+ final BufferedReader reader =
+ new BufferedReader(new InputStreamReader(input.getInputStream()));
+ return new Iterator<Line<KV<String, Integer>>>() {
+ String line = reader.readLine();
+ @Override
+ public boolean hasNext() {
+ return line != null;
+ }
+ @Override
+ public MongoDbGridFSIO.ParseCallback.Line<KV<String, Integer>> next() {
+ try (Scanner scanner = new Scanner(line.trim())) {
+ scanner.useDelimiter("\\t");
+ long timestamp = scanner.nextLong();
+ String name = scanner.next();
+ int score = scanner.nextInt();
+
+ try {
+ line = reader.readLine();
+ } catch (IOException e) {
+ line = null;
+ }
+ if (line == null) {
+ try {
+ reader.close();
+ } catch (IOException e) {
+ //ignore
+ }
+ }
+ return new Line<>(KV.of(name, score), new Instant(timestamp));
+ }
+ }
+ @Override
+ public void remove() {
+ }
+ };
+ }
+ })).setCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()));
+
+ PAssert.thatSingleton(output.apply("Count All", Count.<KV<String, Integer>>globally()))
+ .isEqualTo(50100L);
+
+ PAssert.that(output.apply("Max PerElement", Max.<String>integersPerKey()))
+ .satisfies(new SerializableFunction<Iterable<KV<String, Integer>>, Void>() {
+ @Override
+ public Void apply(Iterable<KV<String, Integer>> input) {
+ for (KV<String, Integer> element : input) {
+ assertEquals(101, element.getValue().longValue());
+ }
+ return null;
+ }
+ });
+ pipeline.run();
+ }
+
+}
[2/2] incubator-beam git commit: [BEAM-674] This closes #1003
Posted by jb...@apache.org.
[BEAM-674] This closes #1003
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/307d592d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/307d592d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/307d592d
Branch: refs/heads/master
Commit: 307d592d2fd3a6d7bd78fc6243292ff8045b3fdc
Parents: 3879db0 68c8c78
Author: Jean-Baptiste Onofr� <jb...@apache.org>
Authored: Wed Sep 28 17:54:52 2016 +0200
Committer: Jean-Baptiste Onofr� <jb...@apache.org>
Committed: Wed Sep 28 17:54:52 2016 +0200
----------------------------------------------------------------------
sdks/java/io/mongodb/pom.xml | 6 +-
.../beam/sdk/io/mongodb/MongoDbGridFSIO.java | 427 +++++++++++++++++++
.../sdk/io/mongodb/MongoDBGridFSIOTest.java | 257 +++++++++++
3 files changed, 689 insertions(+), 1 deletion(-)
----------------------------------------------------------------------