You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2016/09/04 19:57:27 UTC
[1/2] incubator-beam git commit: [BEAM-456] Add MongoDbIO
Repository: incubator-beam
Updated Branches:
refs/heads/master a2c223889 -> 8ca683026
[BEAM-456] Add MongoDbIO
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4b60e368
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4b60e368
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4b60e368
Branch: refs/heads/master
Commit: 4b60e36816df66915061f2c834dabf3ca4ac1b89
Parents: a2c2238
Author: Jean-Baptiste Onofr� <jb...@apache.org>
Authored: Fri Jul 15 18:44:26 2016 +0200
Committer: Jean-Baptiste Onofr� <jb...@apache.org>
Committed: Sun Sep 4 21:56:55 2016 +0200
----------------------------------------------------------------------
sdks/java/io/mongodb/pom.xml | 129 +++++
.../apache/beam/sdk/io/mongodb/MongoDbIO.java | 553 +++++++++++++++++++
.../beam/sdk/io/mongodb/package-info.java | 22 +
.../beam/sdk/io/mongodb/MongoDbIOTest.java | 209 +++++++
.../beam/sdk/io/mongodb/package-info.java | 22 +
sdks/java/io/pom.xml | 1 +
6 files changed, 936 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4b60e368/sdks/java/io/mongodb/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/mongodb/pom.xml b/sdks/java/io/mongodb/pom.xml
new file mode 100644
index 0000000..60f1d1e
--- /dev/null
+++ b/sdks/java/io/mongodb/pom.xml
@@ -0,0 +1,129 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-java-io-parent</artifactId>
+ <version>0.3.0-incubating-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>beam-sdks-java-io-mongodb</artifactId>
+ <name>Apache Beam :: SDKs :: Java :: IO :: MongoDB</name>
+ <description>IO to read and write on MongoDB.</description>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-source-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-checkstyle-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-javadoc-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+
+ <properties>
+ <mongo-java-driver.version>3.2.2</mongo-java-driver.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-java-core</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.code.findbugs</groupId>
+ <artifactId>annotations</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.mongodb</groupId>
+ <artifactId>mongo-java-driver</artifactId>
+ <version>${mongo-java-driver.version}</version>
+ </dependency>
+
+ <!-- test dependencies -->
+ <dependency>
+ <groupId>de.flapdoodle.embed</groupId>
+ <artifactId>de.flapdoodle.embed.mongo</artifactId>
+ <version>1.50.1</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>de.flapdoodle.embed</groupId>
+ <artifactId>de.flapdoodle.embed.process</artifactId>
+ <version>1.50.1</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-jdk14</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-runners-direct-java</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-all</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4b60e368/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
new file mode 100644
index 0000000..7724614
--- /dev/null
+++ b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
@@ -0,0 +1,553 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.mongodb;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+import com.mongodb.BasicDBObject;
+import com.mongodb.MongoClient;
+import com.mongodb.MongoClientURI;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoCursor;
+import com.mongodb.client.MongoDatabase;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.annotation.Nullable;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+
+import org.bson.Document;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * IO to read and write data on MongoDB.
+ * <p>
+ * <h3>Reading from MongoDB</h3>
+ * <p>
+ * <p>MongoDbIO source returns a bounded collection of String as {@code PCollection<String>}.
+ * The String is the JSON form of the MongoDB Document.</p>
+ * <p>
+ * <p>To configure the MongoDB source, you have to provide the connection URI, the database name
+ * and the collection name. The following example illustrates various options for configuring the
+ * source:</p>
+ * <p>
+ * <pre>{@code
+ *
+ * pipeline.apply(MongoDbIO.read()
+ * .withUri("mongodb://localhost:27017")
+ * .withDatabase("my-database")
+ * .withCollection("my-collection"))
+ * // above three are required configuration, returns PCollection<String>
+ *
+ * // rest of the settings are optional
+ *
+ * }</pre>
+ * <p>
+ * <p>The source also accepts an optional configuration: {@code withFilter()} allows you to
+ * define a JSON filter to get subset of data.</p>
+ * <p>
+ * <h3>Writing to MongoDB</h3>
+ * <p>
+ * <p>MongoDB sink supports writing of Document (as JSON String) in a MongoDB.</p>
+ * <p>
+ * <p>To configure a MongoDB sink, you must specify a connection {@code URI}, a {@code Database}
+ * name, a {@code Collection} name. For instance:</p>
+ * <p>
+ * <pre>{@code
+ *
+ * pipeline
+ * .apply(...)
+ * .apply(MongoDbIO.write()
+ * .withUri("mongodb://localhost:27017")
+ * .withDatabase("my-database")
+ * .withCollection("my-collection")
+ * .withNumSplits(30))
+ *
+ * }</pre>
+ */
+// TODO instead of JSON String, does it make sense to populate the PCollection with BSON Document or
+// DBObject ??
+public class MongoDbIO {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(MongoDbIO.class);
+
+ /** Read data from MongoDB. */
+ public static Read read() {
+ return new Read(new BoundedMongoDbSource(null, null, null, null, 0));
+ }
+
+ /** Write data to MongoDB. */
+ public static Write write() {
+ return new Write(new Write.MongoDbWriter(null, null, null, 1024L));
+ }
+
+ private MongoDbIO() {
+ }
+
+ /**
+ * A {@link PTransform} to read data from MongoDB.
+ */
+ public static class Read extends PTransform<PBegin, PCollection<String>> {
+
+ public Read withUri(String uri) {
+ return new Read(source.withUri(uri));
+ }
+
+ public Read withDatabase(String database) {
+ return new Read(source.withDatabase(database));
+ }
+
+ public Read withCollection(String collection) {
+ return new Read(source.withCollection(collection));
+ }
+
+ public Read withFilter(String filter) {
+ return new Read(source.withFilter(filter));
+ }
+
+ public Read withNumSplits(int numSplits) {
+ return new Read(source.withNumSplits(numSplits));
+ }
+
+ private final BoundedMongoDbSource source;
+
+ private Read(BoundedMongoDbSource source) {
+ this.source = source;
+ }
+
+ @Override
+ public PCollection<String> apply(PBegin input) {
+ return input.apply(org.apache.beam.sdk.io.Read.from(getSource()));
+ }
+
+ /**
+ * Creates a {@link BoundedSource} with the configuration in {@link Read}.
+ */
+ @VisibleForTesting
+ BoundedSource<String> getSource() {
+ return source;
+ }
+
+ @Override
+ public void validate(PBegin input) {
+ source.validate();
+ }
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
+
+ source.populateDisplayData(builder);
+ }
+
+ }
+
+ private static class BoundedMongoDbSource extends BoundedSource<String> {
+
+ public BoundedMongoDbSource withUri(String uri) {
+ return new BoundedMongoDbSource(uri, database, collection, filter, numSplits);
+ }
+
+ public BoundedMongoDbSource withDatabase(String database) {
+ return new BoundedMongoDbSource(uri, database, collection, filter, numSplits);
+ }
+
+ public BoundedMongoDbSource withCollection(String collection) {
+ return new BoundedMongoDbSource(uri, database, collection, filter, numSplits);
+ }
+
+ public BoundedMongoDbSource withFilter(String filter) {
+ return new BoundedMongoDbSource(uri, database, collection, filter, numSplits);
+ }
+
+ public BoundedMongoDbSource withNumSplits(int numSplits) {
+ return new BoundedMongoDbSource(uri, database, collection, filter, numSplits);
+ }
+
+ private final String uri;
+ private final String database;
+ private final String collection;
+ @Nullable
+ private final String filter;
+ private final int numSplits;
+
+ public BoundedMongoDbSource(String uri, String database, String collection, String filter,
+ int numSplits) {
+ this.uri = uri;
+ this.database = database;
+ this.collection = collection;
+ this.filter = filter;
+ this.numSplits = numSplits;
+ }
+
+ @Override
+ public Coder getDefaultOutputCoder() {
+ return SerializableCoder.of(String.class);
+ }
+
+ @Override
+ public void validate() {
+ Preconditions.checkNotNull(uri, "uri");
+ Preconditions.checkNotNull(database, "database");
+ Preconditions.checkNotNull(collection, "collection");
+ }
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ builder.add(DisplayData.item("uri", uri));
+ builder.add(DisplayData.item("database", database));
+ builder.add(DisplayData.item("collection", collection));
+ builder.addIfNotNull(DisplayData.item("filter", filter));
+ builder.add(DisplayData.item("numSplit", numSplits));
+ }
+
+ @Override
+ public boolean producesSortedKeys(PipelineOptions options) {
+ return false;
+ }
+
+ @Override
+ public BoundedReader createReader(PipelineOptions options) {
+ return new BoundedMongoDbReader(this);
+ }
+
+ @Override
+ public long getEstimatedSizeBytes(PipelineOptions pipelineOptions) {
+ long estimatedSizeBytes = 0L;
+
+ MongoClient mongoClient = new MongoClient();
+ MongoDatabase mongoDatabase = mongoClient.getDatabase(database);
+ MongoCollection mongoCollection = mongoDatabase.getCollection(collection);
+
+ // get the Mongo collStats object
+ // it gives the size for the entire collection
+ BasicDBObject stat = new BasicDBObject();
+ stat.append("collStats", collection);
+ Document stats = mongoDatabase.runCommand(stat);
+ estimatedSizeBytes = Long.valueOf(stats.get("size").toString());
+ return estimatedSizeBytes;
+ }
+
+ @Override
+ public List<BoundedSource<String>> splitIntoBundles(long desiredBundleSizeBytes,
+ PipelineOptions options) {
+ MongoClient mongoClient = new MongoClient();
+ MongoDatabase mongoDatabase = mongoClient.getDatabase(database);
+
+ List<Document> splitKeys = null;
+ if (numSplits > 0) {
+ // the user defines his desired number of splits
+ // calculate the batch size
+ long estimatedSizeBytes = getEstimatedSizeBytes(options);
+ desiredBundleSizeBytes = estimatedSizeBytes / numSplits;
+ }
+
+ // the desired batch size is small, using default chunk size of 1MB
+ if (desiredBundleSizeBytes < 1024 * 1024) {
+ desiredBundleSizeBytes = 1 * 1024 * 1024;
+ }
+
+ // now we have the batch size (provided by user or provided by the runner)
+ // we use Mongo splitVector command to get the split keys
+ BasicDBObject splitVectorCommand = new BasicDBObject();
+ splitVectorCommand.append("splitVector", database + "." + collection);
+ splitVectorCommand.append("keyPattern", new BasicDBObject().append("_id", 1));
+ splitVectorCommand.append("force", false);
+ // maxChunkSize is the Mongo partition size in MB
+ LOGGER.debug("Splitting in chunk of {} MB", desiredBundleSizeBytes / 1024 / 1024);
+ splitVectorCommand.append("maxChunkSize", desiredBundleSizeBytes / 1024 / 1024);
+ Document splitVectorCommandResult = mongoDatabase.runCommand(splitVectorCommand);
+ splitKeys = (List<Document>) splitVectorCommandResult.get("splitKeys");
+
+ List<BoundedSource<String>> sources = new ArrayList<>();
+ if (splitKeys.size() < 1) {
+ LOGGER.debug("Split keys is low, using an unique source");
+ sources.add(this);
+ return sources;
+ }
+
+ LOGGER.debug("Number of splits is {}", splitKeys.size());
+ for (String shardFilter : splitKeysToFilters(splitKeys, filter)) {
+ sources.add(this.withFilter(shardFilter));
+ }
+
+ return sources;
+ }
+
+ /**
+ * Transform a list of split keys as a list of filters containing corresponding range.
+ *
+ * <p>The list of split keys contains BSon Document basically containing for example:
+ * <ul>
+ * <li>_id: 56</li>
+ * <li>_id: 109</li>
+ * <li>_id: 256</li>
+ * </ul>
+ * </p>
+ *
+ * This method will generate a list of range filters performing the following splits:
+ * <ul>
+ * <li>from the beginning of the collection up to _id 56, so basically data with
+ * _id lower than 56</li>
+ * <li>from _id 57 up to _id 109</li>
+ * <li>from _id 110 up to _id 256</li>
+ * <li>from _id 257 up to the end of the collection, so basically data with _id greater
+ * than 257</li>
+ * </ul>
+ *
+ * @param splitKeys The list of split keys.
+ * @param additionalFilter A custom (user) additional filter to append to the range filters.
+ * @return A list of filters containing the ranges.
+ */
+ private static List<String> splitKeysToFilters(List<Document> splitKeys, String
+ additionalFilter) {
+ ArrayList<String> filters = new ArrayList<>();
+ String lowestBound = null; // lower boundary (previous split in the iteration)
+ for (int i = 0; i < splitKeys.size(); i++) {
+ String splitKey = splitKeys.get(i).toString();
+ String rangeFilter = null;
+ if (i == 0) {
+ // this is the first split in the list, the filter defines
+ // the range from the beginning up to this split
+ rangeFilter = String.format("{ $and: [ {\"_id\":{$lte:Objectd(\"%s\")}}",
+ splitKey);
+ } else if (i == splitKeys.size() - 1) {
+ // this is the last split in the list, the filter defines
+ // the range from the split up to the end
+ rangeFilter = String.format("{ $and: [ {\"_id\":{$gt:ObjectId(\"%s\")}}",
+ splitKey);
+ } else {
+ // we are between two splits
+ rangeFilter = String.format("{ $and: [ {\"_id\":{$gt:ObjectId(\"%s\"),"
+ + "$lte:ObjectId(\"%s\")}}", lowestBound, splitKey);
+ }
+ if (additionalFilter != null && !additionalFilter.isEmpty()) {
+ // user provided a filter, we append the user filter to the range filter
+ rangeFilter = String.format("%s,%s ]}", rangeFilter, additionalFilter);
+ } else {
+ // user didn't provide a filter, just cleany close the range filter
+ rangeFilter = String.format("%s ]}", rangeFilter);
+ }
+
+ filters.add(rangeFilter);
+
+ lowestBound = splitKey;
+ }
+ return filters;
+ }
+ }
+
+ private static class BoundedMongoDbReader extends BoundedSource.BoundedReader<String> {
+
+ private final BoundedMongoDbSource source;
+
+ private MongoClient client;
+ private MongoCursor<Document> cursor;
+ private String current;
+
+ public BoundedMongoDbReader(BoundedMongoDbSource source) {
+ this.source = source;
+ }
+
+ @Override
+ public boolean start() {
+ client = new MongoClient(new MongoClientURI(source.uri));
+
+ MongoDatabase mongoDatabase = client.getDatabase(source.database);
+
+ MongoCollection<Document> mongoCollection = mongoDatabase.getCollection(source.collection);
+
+ if (source.filter == null) {
+ cursor = mongoCollection.find().iterator();
+ } else {
+ Document bson = Document.parse(source.filter);
+ cursor = mongoCollection.find(bson).iterator();
+ }
+
+ return advance();
+ }
+
+ @Override
+ public boolean advance() {
+ if (cursor.hasNext()) {
+ current = cursor.next().toJson();
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public BoundedSource getCurrentSource() {
+ return source;
+ }
+
+ @Override
+ public String getCurrent() {
+ return current;
+ }
+
+ @Override
+ public void close() {
+ try {
+ if (cursor != null) {
+ cursor.close();
+ }
+ } catch (Exception e) {
+ LOGGER.warn("Error closing MongoDB cursor", e);
+ }
+ try {
+ client.close();
+ } catch (Exception e) {
+ LOGGER.warn("Error closing MongoDB client", e);
+ }
+ }
+
+ }
+
+ /**
+ * A {@link PTransform} to write to a MongoDB database.
+ */
+ public static class Write extends PTransform<PCollection<String>, PDone> {
+
+ public Write withUri(String uri) {
+ return new Write(writer.withUri(uri));
+ }
+
+ public Write withDatabase(String database) {
+ return new Write(writer.withDatabase(database));
+ }
+
+ public Write withCollection(String collection) {
+ return new Write(writer.withCollection(collection));
+ }
+
+ public Write withBatchSize(long batchSize) {
+ return new Write(writer.withBatchSize(batchSize));
+ }
+
+ private final MongoDbWriter writer;
+
+ private Write(MongoDbWriter writer) {
+ this.writer = writer;
+ }
+
+ @Override
+ public PDone apply(PCollection<String> input) {
+ input.apply(ParDo.of(writer));
+ return PDone.in(input.getPipeline());
+ }
+
+ @Override
+ public void validate(PCollection<String> input) {
+ writer.validate();
+ }
+
+ private static class MongoDbWriter extends DoFn<String, Void> {
+
+ private final String uri;
+ private final String database;
+ private final String collection;
+ private final long batchSize;
+
+ private MongoClient client;
+ private List<Document> batch;
+
+ public MongoDbWriter(String uri, String database, String collection, long batchSize) {
+ this.uri = uri;
+ this.database = database;
+ this.collection = collection;
+ this.batchSize = batchSize;
+ }
+
+ public MongoDbWriter withUri(String uri) {
+ return new MongoDbWriter(uri, database, collection, batchSize);
+ }
+
+ public MongoDbWriter withDatabase(String database) {
+ return new MongoDbWriter(uri, database, collection, batchSize);
+ }
+
+ public MongoDbWriter withCollection(String collection) {
+ return new MongoDbWriter(uri, database, collection, batchSize);
+ }
+
+ public MongoDbWriter withBatchSize(long batchSize) {
+ return new MongoDbWriter(uri, database, collection, batchSize);
+ }
+
+ public void validate() {
+ Preconditions.checkNotNull(uri, "uri");
+ Preconditions.checkNotNull(database, "database");
+ Preconditions.checkNotNull(collection, "collection");
+ Preconditions.checkNotNull(batchSize, "batchSize");
+ }
+
+ @Setup
+ public void createMongoClient() throws Exception {
+ client = new MongoClient(new MongoClientURI(uri));
+ }
+
+ @StartBundle
+ public void startBundle(Context ctx) throws Exception {
+ batch = new ArrayList<>();
+ }
+
+ @ProcessElement
+ public void processElement(ProcessContext ctx) throws Exception {
+ String value = ctx.element();
+
+ batch.add(Document.parse(ctx.element()));
+ if (batch.size() >= batchSize) {
+ finishBundle(ctx);
+ }
+ }
+
+ @FinishBundle
+ public void finishBundle(Context ctx) throws Exception {
+ MongoDatabase mongoDatabase = client.getDatabase(database);
+ MongoCollection mongoCollection = mongoDatabase.getCollection(collection);
+
+ mongoCollection.insertMany(batch);
+
+ batch.clear();
+ }
+
+ @Teardown
+ public void closeMongoClient() throws Exception {
+ client.close();
+ client = null;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4b60e368/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/package-info.java b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/package-info.java
new file mode 100644
index 0000000..fd08b58
--- /dev/null
+++ b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Transforms for reading and writing from MongoDB.
+ */
+package org.apache.beam.sdk.io.mongodb;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4b60e368/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java
new file mode 100644
index 0000000..308e071
--- /dev/null
+++ b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.mongodb;
+
+import static org.junit.Assert.assertEquals;
+
+import com.mongodb.MongoClient;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoCursor;
+import com.mongodb.client.MongoDatabase;
+
+import de.flapdoodle.embed.mongo.MongodExecutable;
+import de.flapdoodle.embed.mongo.MongodStarter;
+import de.flapdoodle.embed.mongo.config.IMongodConfig;
+import de.flapdoodle.embed.mongo.config.MongoCmdOptionsBuilder;
+import de.flapdoodle.embed.mongo.config.MongodConfigBuilder;
+import de.flapdoodle.embed.mongo.config.Net;
+import de.flapdoodle.embed.mongo.config.Storage;
+import de.flapdoodle.embed.mongo.distribution.Version;
+import de.flapdoodle.embed.process.io.file.Files;
+import de.flapdoodle.embed.process.runtime.Network;
+
+import java.io.File;
+import java.io.Serializable;
+import java.util.ArrayList;
+
+import org.apache.beam.sdk.testing.NeedsRunner;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+
+import org.bson.Document;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test on the MongoDbIO.
+ */
+public class MongoDbIOTest implements Serializable {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(MongoDbIOTest.class);
+
+ private static final String MONGODB_LOCATION = "target/mongodb";
+ private static final int PORT = 27017;
+ private static final String DATABASE = "beam";
+ private static final String COLLECTION = "test";
+
+ private transient MongodExecutable mongodExecutable;
+
+ @Before
+ public void setup() throws Exception {
+ LOGGER.info("Starting MongoDB embedded instance");
+ try {
+ Files.forceDelete(new File(MONGODB_LOCATION));
+ } catch (Exception e) {
+
+ }
+ new File(MONGODB_LOCATION).mkdirs();
+ IMongodConfig mongodConfig = new MongodConfigBuilder()
+ .version(Version.Main.PRODUCTION)
+ .configServer(false)
+ .replication(new Storage(MONGODB_LOCATION, null, 0))
+ .net(new Net("localhost", PORT, Network.localhostIsIPv6()))
+ .cmdOptions(new MongoCmdOptionsBuilder()
+ .syncDelay(10)
+ .useNoPrealloc(true)
+ .useSmallFiles(true)
+ .useNoJournal(true)
+ .build())
+ .build();
+ mongodExecutable = MongodStarter.getDefaultInstance().prepare(mongodConfig);
+ mongodExecutable.start();
+
+ LOGGER.info("Insert test data");
+
+ MongoClient client = new MongoClient("localhost", PORT);
+ MongoDatabase database = client.getDatabase(DATABASE);
+
+ MongoCollection collection = database.getCollection(COLLECTION);
+
+ String[] scientists = {"Einstein", "Darwin", "Copernicus", "Pasteur", "Curie", "Faraday",
+ "Newton", "Bohr", "Galilei", "Maxwell"};
+ for (int i = 1; i <= 1000; i++) {
+ int index = i % scientists.length;
+ Document document = new Document();
+ document.append("_id", i);
+ document.append("scientist", scientists[index]);
+ collection.insertOne(document);
+ }
+
+ }
+
+ @After
+ public void stop() throws Exception {
+ LOGGER.info("Stopping MongoDB instance");
+ mongodExecutable.stop();
+ }
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void testFullRead() throws Exception {
+ TestPipeline pipeline = TestPipeline.create();
+
+ PCollection<String> output = pipeline.apply(
+ MongoDbIO.read()
+ .withUri("mongodb://localhost:" + PORT)
+ .withDatabase(DATABASE)
+ .withCollection(COLLECTION));
+
+ PAssert.thatSingleton(output.apply("Count All", Count.<String>globally()))
+ .isEqualTo(1000L);
+
+ PAssert.that(output
+ .apply("Map Scientist", MapElements.via(new SimpleFunction<String, KV<String, Void>>() {
+ public KV<String, Void> apply(String input) {
+ Document bson = Document.parse(input);
+ return KV.of(bson.getString("scientist"), null);
+ }
+ }))
+ .apply("Count Scientist", Count.<String, Void>perKey())
+ ).satisfies(new SerializableFunction<Iterable<KV<String, Long>>, Void>() {
+ @Override
+ public Void apply(Iterable<KV<String, Long>> input) {
+ for (KV<String, Long> element : input) {
+ assertEquals(100L, element.getValue().longValue());
+ }
+ return null;
+ }
+ });
+
+ pipeline.run();
+ }
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void testReadWithFilter() throws Exception {
+ TestPipeline pipeline = TestPipeline.create();
+
+ PCollection<String> output = pipeline.apply(
+ MongoDbIO.read()
+ .withUri("mongodb://localhost:" + PORT)
+ .withDatabase(DATABASE)
+ .withCollection(COLLECTION)
+ .withFilter("{\"scientist\":\"Einstein\"}"));
+
+ PAssert.thatSingleton(output.apply("Count", Count.<String>globally()))
+ .isEqualTo(100L);
+
+ pipeline.run();
+ }
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void testWrite() throws Exception {
+ TestPipeline pipeline = TestPipeline.create();
+
+ ArrayList<String> data = new ArrayList<>();
+ for (int i = 0; i < 10000; i++) {
+ data.add(String.format("{\"scientist\":\"Test %s\"}", i));
+ }
+ pipeline.apply(Create.of(data))
+ .apply(MongoDbIO.write().withUri("mongodb://localhost:" + PORT).withDatabase("test")
+ .withCollection("test"));
+
+ pipeline.run();
+
+ MongoClient client = new MongoClient("localhost", PORT);
+ MongoDatabase database = client.getDatabase("test");
+ MongoCollection collection = database.getCollection("test");
+
+ MongoCursor cursor = collection.find().iterator();
+
+ int count = 0;
+ while (cursor.hasNext()) {
+ count = count + 1;
+ cursor.next();
+ }
+
+ Assert.assertEquals(10000, count);
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4b60e368/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/package-info.java b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/package-info.java
new file mode 100644
index 0000000..fd08b58
--- /dev/null
+++ b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Transforms for reading and writing from MongoDB.
+ */
+package org.apache.beam.sdk.io.mongodb;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4b60e368/sdks/java/io/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/pom.xml b/sdks/java/io/pom.xml
index 6cbd615..c4c32ed 100644
--- a/sdks/java/io/pom.xml
+++ b/sdks/java/io/pom.xml
@@ -38,6 +38,7 @@
<module>jms</module>
<module>kafka</module>
<module>kinesis</module>
+ <module>mongodb</module>
</modules>
</project>
[2/2] incubator-beam git commit: [BEAM-456] This closes #671
Posted by jb...@apache.org.
[BEAM-456] This closes #671
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8ca68302
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8ca68302
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8ca68302
Branch: refs/heads/master
Commit: 8ca683026456c2973a88984cfc718ac8313707ea
Parents: a2c2238 4b60e36
Author: Jean-Baptiste Onofr� <jb...@apache.org>
Authored: Sun Sep 4 21:57:19 2016 +0200
Committer: Jean-Baptiste Onofr� <jb...@apache.org>
Committed: Sun Sep 4 21:57:19 2016 +0200
----------------------------------------------------------------------
sdks/java/io/mongodb/pom.xml | 129 +++++
.../apache/beam/sdk/io/mongodb/MongoDbIO.java | 553 +++++++++++++++++++
.../beam/sdk/io/mongodb/package-info.java | 22 +
.../beam/sdk/io/mongodb/MongoDbIOTest.java | 209 +++++++
.../beam/sdk/io/mongodb/package-info.java | 22 +
sdks/java/io/pom.xml | 1 +
6 files changed, 936 insertions(+)
----------------------------------------------------------------------