You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/05/11 22:16:56 UTC
[1/2] beam git commit: [BEAM-2256] Add the last previous range filter
Repository: beam
Updated Branches:
refs/heads/release-2.0.0 74bcc0237 -> cf1ce7b24
[BEAM-2256] Add the last previous range filter
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d0960c64
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d0960c64
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d0960c64
Branch: refs/heads/release-2.0.0
Commit: d0960c642d0bf0675f96e0495c7dbe303714c68d
Parents: 74bcc02
Author: Jean-Baptiste Onofré <jb...@apache.org>
Authored: Thu May 11 22:09:50 2017 +0200
Committer: Luke Cwik <lc...@google.com>
Committed: Thu May 11 15:16:14 2017 -0700
----------------------------------------------------------------------
.../apache/beam/sdk/io/mongodb/MongoDbIO.java | 47 ++++++++++++++------
.../beam/sdk/io/mongodb/MongoDbIOTest.java | 18 ++++++++
2 files changed, 52 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/d0960c64/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
index 7236a50..620df74 100644
--- a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
+++ b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
@@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import com.google.auto.value.AutoValue;
+import com.google.common.annotations.VisibleForTesting;
import com.mongodb.BasicDBObject;
import com.mongodb.MongoClient;
import com.mongodb.MongoClientURI;
@@ -184,7 +185,11 @@ public class MongoDbIO {
}
}
- private static class BoundedMongoDbSource extends BoundedSource<Document> {
+ /**
+ * A MongoDB {@link BoundedSource} reading {@link Document} from a given instance.
+ */
+ @VisibleForTesting
+ static class BoundedMongoDbSource extends BoundedSource<Document> {
private Read spec;
private BoundedMongoDbSource(Read spec) {
@@ -294,7 +299,8 @@ public class MongoDbIO {
* @param additionalFilter A custom (user) additional filter to append to the range filters.
* @return A list of filters containing the ranges.
*/
- private static List<String> splitKeysToFilters(List<Document> splitKeys, String
+ @VisibleForTesting
+ static List<String> splitKeysToFilters(List<Document> splitKeys, String
additionalFilter) {
ArrayList<String> filters = new ArrayList<>();
String lowestBound = null; // lower boundary (previous split in the iteration)
@@ -306,30 +312,45 @@ public class MongoDbIO {
// the range from the beginning up to this split
rangeFilter = String.format("{ $and: [ {\"_id\":{$lte:ObjectId(\"%s\")}}",
splitKey);
+ filters.add(formatFilter(rangeFilter, additionalFilter));
} else if (i == splitKeys.size() - 1) {
- // this is the last split in the list, the filter defines
- // the range from the split up to the end
+ // this is the last split in the list, the filters define
+ // the range from the previous split to the current split and also
+ // the current split to the end
+ rangeFilter = String.format("{ $and: [ {\"_id\":{$gt:ObjectId(\"%s\"),"
+ + "$lte:ObjectId(\"%s\")}}", lowestBound, splitKey);
+ filters.add(formatFilter(rangeFilter, additionalFilter));
rangeFilter = String.format("{ $and: [ {\"_id\":{$gt:ObjectId(\"%s\")}}",
splitKey);
+ filters.add(formatFilter(rangeFilter, additionalFilter));
} else {
// we are between two splits
rangeFilter = String.format("{ $and: [ {\"_id\":{$gt:ObjectId(\"%s\"),"
+ "$lte:ObjectId(\"%s\")}}", lowestBound, splitKey);
+ filters.add(formatFilter(rangeFilter, additionalFilter));
}
- if (additionalFilter != null && !additionalFilter.isEmpty()) {
- // user provided a filter, we append the user filter to the range filter
- rangeFilter = String.format("%s,%s ]}", rangeFilter, additionalFilter);
- } else {
- // user didn't provide a filter, just cleany close the range filter
- rangeFilter = String.format("%s ]}", rangeFilter);
- }
-
- filters.add(rangeFilter);
lowestBound = splitKey;
}
return filters;
}
+
+ /**
+ * Cleanly format range filter, optionally adding the users filter if specified.
+ *
+ * @param filter The range filter.
+ * @param additionalFilter The users filter. Null if unspecified.
+ * @return The cleanly formatted range filter.
+ */
+ private static String formatFilter(String filter, @Nullable String additionalFilter) {
+ if (additionalFilter != null && !additionalFilter.isEmpty()) {
+ // user provided a filter, we append the user filter to the range filter
+ return String.format("%s,%s ]}", filter, additionalFilter);
+ } else {
+ // user didn't provide a filter, just cleanly close the range filter
+ return String.format("%s ]}", filter);
+ }
+ }
}
private static class BoundedMongoDbReader extends BoundedSource.BoundedReader<Document> {
http://git-wip-us.apache.org/repos/asf/beam/blob/d0960c64/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java
index 454c6ba..cd26b48 100644
--- a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java
+++ b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java
@@ -38,6 +38,8 @@ import java.io.File;
import java.io.Serializable;
import java.net.ServerSocket;
import java.util.ArrayList;
+import java.util.List;
+
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
@@ -139,6 +141,22 @@ public class MongoDbIOTest implements Serializable {
}
@Test
+ public void testSplitIntoFilters() throws Exception {
+ ArrayList<Document> documents = new ArrayList<>();
+ documents.add(new Document("_id", 56));
+ documents.add(new Document("_id", 109));
+ documents.add(new Document("_id", 256));
+ List<String> filters = MongoDbIO.BoundedMongoDbSource.splitKeysToFilters(documents, null);
+ assertEquals(4, filters.size());
+ assertEquals("{ $and: [ {\"_id\":{$lte:ObjectId(\"56\")}} ]}", filters.get(0));
+ assertEquals("{ $and: [ {\"_id\":{$gt:ObjectId(\"56\"),$lte:ObjectId(\"109\")}} ]}",
+ filters.get(1));
+ assertEquals("{ $and: [ {\"_id\":{$gt:ObjectId(\"109\"),$lte:ObjectId(\"256\")}} ]}",
+ filters.get(2));
+ assertEquals("{ $and: [ {\"_id\":{$gt:ObjectId(\"256\")}} ]}", filters.get(3));
+ }
+
+ @Test
public void testFullRead() throws Exception {
PCollection<Document> output = pipeline.apply(
[2/2] beam git commit: Cherry pick #3093: [BEAM-2256] Add the last
previous range filter
Posted by lc...@apache.org.
Cherry pick #3093: [BEAM-2256] Add the last previous range filter
This closes #3100
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/cf1ce7b2
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/cf1ce7b2
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/cf1ce7b2
Branch: refs/heads/release-2.0.0
Commit: cf1ce7b247f68a238022b17fb5ae34cec4aca24b
Parents: 74bcc02 d0960c6
Author: Luke Cwik <lc...@google.com>
Authored: Thu May 11 15:16:43 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Thu May 11 15:16:43 2017 -0700
----------------------------------------------------------------------
.../apache/beam/sdk/io/mongodb/MongoDbIO.java | 47 ++++++++++++++------
.../beam/sdk/io/mongodb/MongoDbIOTest.java | 18 ++++++++
2 files changed, 52 insertions(+), 13 deletions(-)
----------------------------------------------------------------------