You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2016/09/04 19:57:27 UTC

[1/2] incubator-beam git commit: [BEAM-456] Add MongoDbIO

Repository: incubator-beam
Updated Branches:
  refs/heads/master a2c223889 -> 8ca683026


[BEAM-456] Add MongoDbIO


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4b60e368
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4b60e368
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4b60e368

Branch: refs/heads/master
Commit: 4b60e36816df66915061f2c834dabf3ca4ac1b89
Parents: a2c2238
Author: Jean-Baptiste Onofr� <jb...@apache.org>
Authored: Fri Jul 15 18:44:26 2016 +0200
Committer: Jean-Baptiste Onofr� <jb...@apache.org>
Committed: Sun Sep 4 21:56:55 2016 +0200

----------------------------------------------------------------------
 sdks/java/io/mongodb/pom.xml                    | 129 +++++
 .../apache/beam/sdk/io/mongodb/MongoDbIO.java   | 553 +++++++++++++++++++
 .../beam/sdk/io/mongodb/package-info.java       |  22 +
 .../beam/sdk/io/mongodb/MongoDbIOTest.java      | 209 +++++++
 .../beam/sdk/io/mongodb/package-info.java       |  22 +
 sdks/java/io/pom.xml                            |   1 +
 6 files changed, 936 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4b60e368/sdks/java/io/mongodb/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/mongodb/pom.xml b/sdks/java/io/mongodb/pom.xml
new file mode 100644
index 0000000..60f1d1e
--- /dev/null
+++ b/sdks/java/io/mongodb/pom.xml
@@ -0,0 +1,129 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.beam</groupId>
+    <artifactId>beam-sdks-java-io-parent</artifactId>
+    <version>0.3.0-incubating-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>beam-sdks-java-io-mongodb</artifactId>
+  <name>Apache Beam :: SDKs :: Java :: IO :: MongoDB</name>
+  <description>IO to read and write on MongoDB.</description>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-source-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-checkstyle-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-javadoc-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </build>
+
+  <properties>
+    <mongo-java-driver.version>3.2.2</mongo-java-driver.version>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-core</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.code.findbugs</groupId>
+      <artifactId>annotations</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.mongodb</groupId>
+      <artifactId>mongo-java-driver</artifactId>
+      <version>${mongo-java-driver.version}</version>
+    </dependency>
+
+    <!-- test dependencies -->
+    <dependency>
+      <groupId>de.flapdoodle.embed</groupId>
+      <artifactId>de.flapdoodle.embed.mongo</artifactId>
+      <version>1.50.1</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>de.flapdoodle.embed</groupId>
+      <artifactId>de.flapdoodle.embed.process</artifactId>
+      <version>1.50.1</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-jdk14</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-runners-direct-java</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.hamcrest</groupId>
+      <artifactId>hamcrest-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4b60e368/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
new file mode 100644
index 0000000..7724614
--- /dev/null
+++ b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
@@ -0,0 +1,553 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.mongodb;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+import com.mongodb.BasicDBObject;
+import com.mongodb.MongoClient;
+import com.mongodb.MongoClientURI;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoCursor;
+import com.mongodb.client.MongoDatabase;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.annotation.Nullable;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+
+import org.bson.Document;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * IO to read and write data on MongoDB.
+ * <p>
+ * <h3>Reading from MongoDB</h3>
+ * <p>
+ * <p>MongoDbIO source returns a bounded collection of String as {@code PCollection<String>}.
+ * The String is the JSON form of the MongoDB Document.</p>
+ * <p>
+ * <p>To configure the MongoDB source, you have to provide the connection URI, the database name
+ * and the collection name. The following example illustrates various options for configuring the
+ * source:</p>
+ * <p>
+ * <pre>{@code
+ *
+ * pipeline.apply(MongoDbIO.read()
+ *   .withUri("mongodb://localhost:27017")
+ *   .withDatabase("my-database")
+ *   .withCollection("my-collection"))
+ *   // above three are required configuration, returns PCollection<String>
+ *
+ *   // rest of the settings are optional
+ *
+ * }</pre>
+ * <p>
+ * <p>The source also accepts an optional configuration: {@code withFilter()} allows you to
+ * define a JSON filter to get subset of data.</p>
+ * <p>
+ * <h3>Writing to MongoDB</h3>
+ * <p>
+ * <p>MongoDB sink supports writing of Document (as JSON String) in a MongoDB.</p>
+ * <p>
+ * <p>To configure a MongoDB sink, you must specify a connection {@code URI}, a {@code Database}
+ * name, a {@code Collection} name. For instance:</p>
+ * <p>
+ * <pre>{@code
+ *
+ * pipeline
+ *   .apply(...)
+ *   .apply(MongoDbIO.write()
+ *     .withUri("mongodb://localhost:27017")
+ *     .withDatabase("my-database")
+ *     .withCollection("my-collection")
+ *     .withNumSplits(30))
+ *
+ * }</pre>
+ */
+// TODO instead of JSON String, does it make sense to populate the PCollection with BSON Document or
+//  DBObject ??
+public class MongoDbIO {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(MongoDbIO.class);
+
+  /** Read data from MongoDB. */
+  public static Read read() {
+    return new Read(new BoundedMongoDbSource(null, null, null, null, 0));
+  }
+
+  /** Write data to MongoDB. */
+  public static Write write() {
+    return new Write(new Write.MongoDbWriter(null, null, null, 1024L));
+  }
+
+  private MongoDbIO() {
+  }
+
+  /**
+   * A {@link PTransform} to read data from MongoDB.
+   */
+  public static class Read extends PTransform<PBegin, PCollection<String>> {
+
+    public Read withUri(String uri) {
+      return new Read(source.withUri(uri));
+    }
+
+    public Read withDatabase(String database) {
+      return new Read(source.withDatabase(database));
+    }
+
+    public Read withCollection(String collection) {
+      return new Read(source.withCollection(collection));
+    }
+
+    public Read withFilter(String filter) {
+      return new Read(source.withFilter(filter));
+    }
+
+    public Read withNumSplits(int numSplits) {
+      return new Read(source.withNumSplits(numSplits));
+    }
+
+    private final BoundedMongoDbSource source;
+
+    private Read(BoundedMongoDbSource source) {
+      this.source = source;
+    }
+
+    @Override
+    public PCollection<String> apply(PBegin input) {
+      return input.apply(org.apache.beam.sdk.io.Read.from(getSource()));
+    }
+
+    /**
+     * Creates a {@link BoundedSource} with the configuration in {@link Read}.
+     */
+    @VisibleForTesting
+    BoundedSource<String> getSource() {
+      return source;
+    }
+
+    @Override
+    public void validate(PBegin input) {
+      source.validate();
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+
+      source.populateDisplayData(builder);
+    }
+
+  }
+
+  private static class BoundedMongoDbSource extends BoundedSource<String> {
+
+    public BoundedMongoDbSource withUri(String uri) {
+      return new BoundedMongoDbSource(uri, database, collection, filter, numSplits);
+    }
+
+    public BoundedMongoDbSource withDatabase(String database) {
+      return new BoundedMongoDbSource(uri, database, collection, filter, numSplits);
+    }
+
+    public BoundedMongoDbSource withCollection(String collection) {
+      return new BoundedMongoDbSource(uri, database, collection, filter, numSplits);
+    }
+
+    public BoundedMongoDbSource withFilter(String filter) {
+      return new BoundedMongoDbSource(uri, database, collection, filter, numSplits);
+    }
+
+    public BoundedMongoDbSource withNumSplits(int numSplits) {
+      return new BoundedMongoDbSource(uri, database, collection, filter, numSplits);
+    }
+
+    private final String uri;
+    private final String database;
+    private final String collection;
+    @Nullable
+    private final String filter;
+    private final int numSplits;
+
+    public BoundedMongoDbSource(String uri, String database, String collection, String filter,
+                                int numSplits) {
+      this.uri = uri;
+      this.database = database;
+      this.collection = collection;
+      this.filter = filter;
+      this.numSplits = numSplits;
+    }
+
+    @Override
+    public Coder getDefaultOutputCoder() {
+      return SerializableCoder.of(String.class);
+    }
+
+    @Override
+    public void validate() {
+      Preconditions.checkNotNull(uri, "uri");
+      Preconditions.checkNotNull(database, "database");
+      Preconditions.checkNotNull(collection, "collection");
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      builder.add(DisplayData.item("uri", uri));
+      builder.add(DisplayData.item("database", database));
+      builder.add(DisplayData.item("collection", collection));
+      builder.addIfNotNull(DisplayData.item("filter", filter));
+      builder.add(DisplayData.item("numSplit", numSplits));
+    }
+
+    @Override
+    public boolean producesSortedKeys(PipelineOptions options) {
+      return false;
+    }
+
+    @Override
+    public BoundedReader createReader(PipelineOptions options) {
+      return new BoundedMongoDbReader(this);
+    }
+
+    @Override
+    public long getEstimatedSizeBytes(PipelineOptions pipelineOptions) {
+      long estimatedSizeBytes = 0L;
+
+      MongoClient mongoClient = new MongoClient();
+      MongoDatabase mongoDatabase = mongoClient.getDatabase(database);
+      MongoCollection mongoCollection = mongoDatabase.getCollection(collection);
+
+      // get the Mongo collStats object
+      // it gives the size for the entire collection
+      BasicDBObject stat = new BasicDBObject();
+      stat.append("collStats", collection);
+      Document stats = mongoDatabase.runCommand(stat);
+      estimatedSizeBytes = Long.valueOf(stats.get("size").toString());
+      return estimatedSizeBytes;
+    }
+
+    @Override
+    public List<BoundedSource<String>> splitIntoBundles(long desiredBundleSizeBytes,
+                                                PipelineOptions options) {
+      MongoClient mongoClient = new MongoClient();
+      MongoDatabase mongoDatabase = mongoClient.getDatabase(database);
+
+      List<Document> splitKeys = null;
+      if (numSplits > 0) {
+        // the user defines his desired number of splits
+        // calculate the batch size
+        long estimatedSizeBytes = getEstimatedSizeBytes(options);
+        desiredBundleSizeBytes = estimatedSizeBytes / numSplits;
+      }
+
+      // the desired batch size is small, using default chunk size of 1MB
+      if (desiredBundleSizeBytes < 1024 * 1024) {
+        desiredBundleSizeBytes = 1 * 1024 * 1024;
+      }
+
+      // now we have the batch size (provided by user or provided by the runner)
+      // we use Mongo splitVector command to get the split keys
+      BasicDBObject splitVectorCommand = new BasicDBObject();
+      splitVectorCommand.append("splitVector", database + "." + collection);
+      splitVectorCommand.append("keyPattern", new BasicDBObject().append("_id", 1));
+      splitVectorCommand.append("force", false);
+      // maxChunkSize is the Mongo partition size in MB
+      LOGGER.debug("Splitting in chunk of {} MB", desiredBundleSizeBytes / 1024 / 1024);
+      splitVectorCommand.append("maxChunkSize", desiredBundleSizeBytes / 1024 / 1024);
+      Document splitVectorCommandResult = mongoDatabase.runCommand(splitVectorCommand);
+      splitKeys = (List<Document>) splitVectorCommandResult.get("splitKeys");
+
+      List<BoundedSource<String>> sources = new ArrayList<>();
+      if (splitKeys.size() < 1) {
+        LOGGER.debug("Split keys is low, using an unique source");
+        sources.add(this);
+        return sources;
+      }
+
+      LOGGER.debug("Number of splits is {}", splitKeys.size());
+      for (String shardFilter : splitKeysToFilters(splitKeys, filter)) {
+        sources.add(this.withFilter(shardFilter));
+      }
+
+      return sources;
+    }
+
+    /**
+     * Transform a list of split keys as a list of filters containing corresponding range.
+     *
+     * <p>The list of split keys contains BSon Document basically containing for example:
+     * <ul>
+     *   <li>_id: 56</li>
+     *   <li>_id: 109</li>
+     *   <li>_id: 256</li>
+     * </ul>
+     * </p>
+     *
+     * This method will generate a list of range filters performing the following splits:
+     * <ul>
+     *   <li>from the beginning of the collection up to _id 56, so basically data with
+     *   _id lower than 56</li>
+     *   <li>from _id 57 up to _id 109</li>
+     *   <li>from _id 110 up to _id 256</li>
+     *   <li>from _id 257 up to the end of the collection, so basically data with _id greater
+     *   than 257</li>
+     * </ul>
+     *
+     * @param splitKeys The list of split keys.
+     * @param additionalFilter A custom (user) additional filter to append to the range filters.
+     * @return A list of filters containing the ranges.
+     */
+    private static List<String> splitKeysToFilters(List<Document> splitKeys, String
+        additionalFilter) {
+      ArrayList<String> filters = new ArrayList<>();
+      String lowestBound = null; // lower boundary (previous split in the iteration)
+      for (int i = 0; i < splitKeys.size(); i++) {
+        String splitKey = splitKeys.get(i).toString();
+        String rangeFilter = null;
+        if (i == 0) {
+          // this is the first split in the list, the filter defines
+          // the range from the beginning up to this split
+          rangeFilter = String.format("{ $and: [ {\"_id\":{$lte:Objectd(\"%s\")}}",
+              splitKey);
+        } else if (i == splitKeys.size() - 1) {
+          // this is the last split in the list, the filter defines
+          // the range from the split up to the end
+          rangeFilter = String.format("{ $and: [ {\"_id\":{$gt:ObjectId(\"%s\")}}",
+              splitKey);
+        } else {
+          // we are between two splits
+          rangeFilter = String.format("{ $and: [ {\"_id\":{$gt:ObjectId(\"%s\"),"
+              + "$lte:ObjectId(\"%s\")}}", lowestBound, splitKey);
+        }
+        if (additionalFilter != null && !additionalFilter.isEmpty()) {
+          // user provided a filter, we append the user filter to the range filter
+          rangeFilter = String.format("%s,%s ]}", rangeFilter, additionalFilter);
+        } else {
+          // user didn't provide a filter, just cleany close the range filter
+          rangeFilter = String.format("%s ]}", rangeFilter);
+        }
+
+        filters.add(rangeFilter);
+
+        lowestBound = splitKey;
+      }
+      return filters;
+    }
+  }
+
+  private static class BoundedMongoDbReader extends BoundedSource.BoundedReader<String> {
+
+    private final BoundedMongoDbSource source;
+
+    private MongoClient client;
+    private MongoCursor<Document> cursor;
+    private String current;
+
+    public BoundedMongoDbReader(BoundedMongoDbSource source) {
+      this.source = source;
+    }
+
+    @Override
+    public boolean start() {
+      client = new MongoClient(new MongoClientURI(source.uri));
+
+      MongoDatabase mongoDatabase = client.getDatabase(source.database);
+
+      MongoCollection<Document> mongoCollection = mongoDatabase.getCollection(source.collection);
+
+      if (source.filter == null) {
+        cursor = mongoCollection.find().iterator();
+      } else {
+        Document bson = Document.parse(source.filter);
+        cursor = mongoCollection.find(bson).iterator();
+      }
+
+      return advance();
+    }
+
+    @Override
+    public boolean advance() {
+      if (cursor.hasNext()) {
+        current = cursor.next().toJson();
+        return true;
+      } else {
+        return false;
+      }
+    }
+
+    @Override
+    public BoundedSource getCurrentSource() {
+      return source;
+    }
+
+    @Override
+    public String getCurrent() {
+      return current;
+    }
+
+    @Override
+    public void close() {
+      try {
+        if (cursor != null) {
+          cursor.close();
+        }
+      } catch (Exception e) {
+        LOGGER.warn("Error closing MongoDB cursor", e);
+      }
+      try {
+        client.close();
+      } catch (Exception e) {
+        LOGGER.warn("Error closing MongoDB client", e);
+      }
+    }
+
+  }
+
+  /**
+   * A {@link PTransform} to write to a MongoDB database.
+   */
+  public static class Write extends PTransform<PCollection<String>, PDone> {
+
+    public Write withUri(String uri) {
+      return new Write(writer.withUri(uri));
+    }
+
+    public Write withDatabase(String database) {
+      return new Write(writer.withDatabase(database));
+    }
+
+    public Write withCollection(String collection) {
+      return new Write(writer.withCollection(collection));
+    }
+
+    public Write withBatchSize(long batchSize) {
+      return new Write(writer.withBatchSize(batchSize));
+    }
+
+    private final MongoDbWriter writer;
+
+    private Write(MongoDbWriter writer) {
+      this.writer = writer;
+    }
+
+    @Override
+    public PDone apply(PCollection<String> input) {
+      input.apply(ParDo.of(writer));
+      return PDone.in(input.getPipeline());
+    }
+
+    @Override
+    public void validate(PCollection<String> input) {
+      writer.validate();
+    }
+
+    private static class MongoDbWriter extends DoFn<String, Void> {
+
+      private final String uri;
+      private final String database;
+      private final String collection;
+      private final long batchSize;
+
+      private MongoClient client;
+      private List<Document> batch;
+
+      public MongoDbWriter(String uri, String database, String collection, long batchSize) {
+        this.uri = uri;
+        this.database = database;
+        this.collection = collection;
+        this.batchSize = batchSize;
+      }
+
+      public MongoDbWriter withUri(String uri) {
+        return new MongoDbWriter(uri, database, collection, batchSize);
+      }
+
+      public MongoDbWriter withDatabase(String database) {
+        return new MongoDbWriter(uri, database, collection, batchSize);
+      }
+
+      public MongoDbWriter withCollection(String collection) {
+        return new MongoDbWriter(uri, database, collection, batchSize);
+      }
+
+      public MongoDbWriter withBatchSize(long batchSize) {
+        return new MongoDbWriter(uri, database, collection, batchSize);
+      }
+
+      public void validate() {
+        Preconditions.checkNotNull(uri, "uri");
+        Preconditions.checkNotNull(database, "database");
+        Preconditions.checkNotNull(collection, "collection");
+        Preconditions.checkNotNull(batchSize, "batchSize");
+      }
+
+      @Setup
+      public void createMongoClient() throws Exception {
+        client = new MongoClient(new MongoClientURI(uri));
+      }
+
+      @StartBundle
+      public void startBundle(Context ctx) throws Exception {
+        batch = new ArrayList<>();
+      }
+
+      @ProcessElement
+      public void processElement(ProcessContext ctx) throws Exception {
+        String value = ctx.element();
+
+        batch.add(Document.parse(ctx.element()));
+        if (batch.size() >= batchSize) {
+          finishBundle(ctx);
+        }
+      }
+
+      @FinishBundle
+      public void finishBundle(Context ctx) throws Exception {
+        MongoDatabase mongoDatabase = client.getDatabase(database);
+        MongoCollection mongoCollection = mongoDatabase.getCollection(collection);
+
+        mongoCollection.insertMany(batch);
+
+        batch.clear();
+      }
+
+      @Teardown
+      public void closeMongoClient() throws Exception {
+        client.close();
+        client = null;
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4b60e368/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/package-info.java b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/package-info.java
new file mode 100644
index 0000000..fd08b58
--- /dev/null
+++ b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Transforms for reading and writing from MongoDB.
+ */
+package org.apache.beam.sdk.io.mongodb;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4b60e368/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java
new file mode 100644
index 0000000..308e071
--- /dev/null
+++ b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.mongodb;
+
+import static org.junit.Assert.assertEquals;
+
+import com.mongodb.MongoClient;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoCursor;
+import com.mongodb.client.MongoDatabase;
+
+import de.flapdoodle.embed.mongo.MongodExecutable;
+import de.flapdoodle.embed.mongo.MongodStarter;
+import de.flapdoodle.embed.mongo.config.IMongodConfig;
+import de.flapdoodle.embed.mongo.config.MongoCmdOptionsBuilder;
+import de.flapdoodle.embed.mongo.config.MongodConfigBuilder;
+import de.flapdoodle.embed.mongo.config.Net;
+import de.flapdoodle.embed.mongo.config.Storage;
+import de.flapdoodle.embed.mongo.distribution.Version;
+import de.flapdoodle.embed.process.io.file.Files;
+import de.flapdoodle.embed.process.runtime.Network;
+
+import java.io.File;
+import java.io.Serializable;
+import java.util.ArrayList;
+
+import org.apache.beam.sdk.testing.NeedsRunner;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+
+import org.bson.Document;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test on the MongoDbIO.
+ */
+public class MongoDbIOTest implements Serializable {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(MongoDbIOTest.class);
+
+  private static final String MONGODB_LOCATION = "target/mongodb";
+  private static final int PORT = 27017;
+  private static final String DATABASE = "beam";
+  private static final String COLLECTION = "test";
+
+  private transient MongodExecutable mongodExecutable;
+
+  @Before
+  public void setup() throws Exception {
+    LOGGER.info("Starting MongoDB embedded instance");
+    try {
+      Files.forceDelete(new File(MONGODB_LOCATION));
+    } catch (Exception e) {
+
+    }
+    new File(MONGODB_LOCATION).mkdirs();
+    IMongodConfig mongodConfig = new MongodConfigBuilder()
+        .version(Version.Main.PRODUCTION)
+        .configServer(false)
+        .replication(new Storage(MONGODB_LOCATION, null, 0))
+        .net(new Net("localhost", PORT, Network.localhostIsIPv6()))
+        .cmdOptions(new MongoCmdOptionsBuilder()
+            .syncDelay(10)
+            .useNoPrealloc(true)
+            .useSmallFiles(true)
+            .useNoJournal(true)
+            .build())
+        .build();
+    mongodExecutable = MongodStarter.getDefaultInstance().prepare(mongodConfig);
+    mongodExecutable.start();
+
+    LOGGER.info("Insert test data");
+
+    MongoClient client = new MongoClient("localhost", PORT);
+    MongoDatabase database = client.getDatabase(DATABASE);
+
+    MongoCollection collection = database.getCollection(COLLECTION);
+
+    String[] scientists = {"Einstein", "Darwin", "Copernicus", "Pasteur", "Curie", "Faraday",
+        "Newton", "Bohr", "Galilei", "Maxwell"};
+    for (int i = 1; i <= 1000; i++) {
+      int index = i % scientists.length;
+      Document document = new Document();
+      document.append("_id", i);
+      document.append("scientist", scientists[index]);
+      collection.insertOne(document);
+    }
+
+  }
+
+  @After
+  public void stop() throws Exception {
+    LOGGER.info("Stopping MongoDB instance");
+    mongodExecutable.stop();
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testFullRead() throws Exception {
+    TestPipeline pipeline = TestPipeline.create();
+
+    PCollection<String> output = pipeline.apply(
+        MongoDbIO.read()
+          .withUri("mongodb://localhost:" + PORT)
+          .withDatabase(DATABASE)
+          .withCollection(COLLECTION));
+
+    PAssert.thatSingleton(output.apply("Count All", Count.<String>globally()))
+        .isEqualTo(1000L);
+
+    PAssert.that(output
+        .apply("Map Scientist", MapElements.via(new SimpleFunction<String, KV<String, Void>>() {
+          public KV<String, Void> apply(String input) {
+            Document bson = Document.parse(input);
+            return KV.of(bson.getString("scientist"), null);
+          }
+        }))
+        .apply("Count Scientist", Count.<String, Void>perKey())
+    ).satisfies(new SerializableFunction<Iterable<KV<String, Long>>, Void>() {
+      @Override
+      public Void apply(Iterable<KV<String, Long>> input) {
+        for (KV<String, Long> element : input) {
+          assertEquals(100L, element.getValue().longValue());
+        }
+        return null;
+      }
+    });
+
+    pipeline.run();
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testReadWithFilter() throws Exception {
+    TestPipeline pipeline = TestPipeline.create();
+
+    PCollection<String> output = pipeline.apply(
+        MongoDbIO.read()
+        .withUri("mongodb://localhost:" + PORT)
+        .withDatabase(DATABASE)
+        .withCollection(COLLECTION)
+        .withFilter("{\"scientist\":\"Einstein\"}"));
+
+    PAssert.thatSingleton(output.apply("Count", Count.<String>globally()))
+        .isEqualTo(100L);
+
+    pipeline.run();
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testWrite() throws Exception {
+    TestPipeline pipeline = TestPipeline.create();
+
+    ArrayList<String> data = new ArrayList<>();
+    for (int i = 0; i < 10000; i++) {
+      data.add(String.format("{\"scientist\":\"Test %s\"}", i));
+    }
+    pipeline.apply(Create.of(data))
+        .apply(MongoDbIO.write().withUri("mongodb://localhost:" + PORT).withDatabase("test")
+            .withCollection("test"));
+
+    pipeline.run();
+
+    MongoClient client = new MongoClient("localhost", PORT);
+    MongoDatabase database = client.getDatabase("test");
+    MongoCollection collection = database.getCollection("test");
+
+    MongoCursor cursor = collection.find().iterator();
+
+    int count = 0;
+    while (cursor.hasNext()) {
+      count = count + 1;
+      cursor.next();
+    }
+
+    Assert.assertEquals(10000, count);
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4b60e368/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/package-info.java b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/package-info.java
new file mode 100644
index 0000000..fd08b58
--- /dev/null
+++ b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Transforms for reading and writing from MongoDB.
+ */
+package org.apache.beam.sdk.io.mongodb;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4b60e368/sdks/java/io/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/pom.xml b/sdks/java/io/pom.xml
index 6cbd615..c4c32ed 100644
--- a/sdks/java/io/pom.xml
+++ b/sdks/java/io/pom.xml
@@ -38,6 +38,7 @@
     <module>jms</module>
     <module>kafka</module>
     <module>kinesis</module>
+    <module>mongodb</module>
   </modules>
 
 </project>


[2/2] incubator-beam git commit: [BEAM-456] This closes #671

Posted by jb...@apache.org.
[BEAM-456] This closes #671


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8ca68302
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8ca68302
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8ca68302

Branch: refs/heads/master
Commit: 8ca683026456c2973a88984cfc718ac8313707ea
Parents: a2c2238 4b60e36
Author: Jean-Baptiste Onofr� <jb...@apache.org>
Authored: Sun Sep 4 21:57:19 2016 +0200
Committer: Jean-Baptiste Onofr� <jb...@apache.org>
Committed: Sun Sep 4 21:57:19 2016 +0200

----------------------------------------------------------------------
 sdks/java/io/mongodb/pom.xml                    | 129 +++++
 .../apache/beam/sdk/io/mongodb/MongoDbIO.java   | 553 +++++++++++++++++++
 .../beam/sdk/io/mongodb/package-info.java       |  22 +
 .../beam/sdk/io/mongodb/MongoDbIOTest.java      | 209 +++++++
 .../beam/sdk/io/mongodb/package-info.java       |  22 +
 sdks/java/io/pom.xml                            |   1 +
 6 files changed, 936 insertions(+)
----------------------------------------------------------------------