You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2017/06/02 17:01:11 UTC
[1/2] beam git commit: [BEAM-2378] support FULL OUTER JOIN
Repository: beam
Updated Branches:
refs/heads/master 2f9428c3e -> f1386c1cb
[BEAM-2378] support FULL OUTER JOIN
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c5918b2f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c5918b2f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c5918b2f
Branch: refs/heads/master
Commit: c5918b2f7ce36c755e2a285c42cc6b628b9ee319
Parents: 2f9428c
Author: James Xu <xu...@gmail.com>
Authored: Wed May 31 10:28:55 2017 +0800
Committer: Davor Bonaci <da...@google.com>
Committed: Fri Jun 2 10:00:57 2017 -0700
----------------------------------------------------------------------
.../beam/sdk/extensions/joinlibrary/Join.java | 65 ++++++-
.../joinlibrary/OuterFullJoinTest.java | 179 +++++++++++++++++++
2 files changed, 243 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/c5918b2f/sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java b/sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java
index f4e6ccb..9acb048 100644
--- a/sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java
+++ b/sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java
@@ -141,7 +141,7 @@ public class Join {
* @param <V1> Type of the values for the left collection.
* @param <V2> Type of the values for the right collection.
* @return A joined collection of KV where Key is the key and value is a
- * KV where Key is of type V1 and Value is type V2. Keys that
+ * KV where Key is of type V1 and Value is type V2. Values that
* should be null or empty is replaced with nullValue.
*/
public static <K, V1, V2> PCollection<KV<K, KV<V1, V2>>> rightOuterJoin(
@@ -184,4 +184,67 @@ public class Join {
KvCoder.of(((KvCoder) leftCollection.getCoder()).getValueCoder(),
((KvCoder) rightCollection.getCoder()).getValueCoder())));
}
+
+ /**
+ * Full Outer Join of two collections of KV elements.
+ * @param leftCollection Left side collection to join.
+ * @param rightCollection Right side collection to join.
+ * @param leftNullValue Value to use as null value when left side do not match right side.
+ * @param rightNullValue Value to use as null value when right side do not match right side.
+ * @param <K> Type of the key for both collections
+ * @param <V1> Type of the values for the left collection.
+ * @param <V2> Type of the values for the right collection.
+ * @return A joined collection of KV where Key is the key and value is a
+ * KV where Key is of type V1 and Value is type V2. Values that
+ * should be null or empty is replaced with leftNullValue/rightNullValue.
+ */
+ public static <K, V1, V2> PCollection<KV<K, KV<V1, V2>>> fullOuterJoin(
+ final PCollection<KV<K, V1>> leftCollection,
+ final PCollection<KV<K, V2>> rightCollection,
+ final V1 leftNullValue, final V2 rightNullValue) {
+ checkNotNull(leftCollection);
+ checkNotNull(rightCollection);
+ checkNotNull(leftNullValue);
+ checkNotNull(rightNullValue);
+
+ final TupleTag<V1> v1Tuple = new TupleTag<>();
+ final TupleTag<V2> v2Tuple = new TupleTag<>();
+
+ PCollection<KV<K, CoGbkResult>> coGbkResultCollection =
+ KeyedPCollectionTuple.of(v1Tuple, leftCollection)
+ .and(v2Tuple, rightCollection)
+ .apply(CoGroupByKey.<K>create());
+
+ return coGbkResultCollection.apply(ParDo.of(
+ new DoFn<KV<K, CoGbkResult>, KV<K, KV<V1, V2>>>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ KV<K, CoGbkResult> e = c.element();
+
+ Iterable<V1> leftValuesIterable = e.getValue().getAll(v1Tuple);
+ Iterable<V2> rightValuesIterable = e.getValue().getAll(v2Tuple);
+ if (leftValuesIterable.iterator().hasNext()
+ && rightValuesIterable.iterator().hasNext()) {
+ for (V2 rightValue : rightValuesIterable) {
+ for (V1 leftValue : leftValuesIterable) {
+ c.output(KV.of(e.getKey(), KV.of(leftValue, rightValue)));
+ }
+ }
+ } else if (leftValuesIterable.iterator().hasNext()
+ && !rightValuesIterable.iterator().hasNext()) {
+ for (V1 leftValue : leftValuesIterable) {
+ c.output(KV.of(e.getKey(), KV.of(leftValue, rightNullValue)));
+ }
+ } else if (!leftValuesIterable.iterator().hasNext()
+ && rightValuesIterable.iterator().hasNext()) {
+ for (V2 rightValue : rightValuesIterable) {
+ c.output(KV.of(e.getKey(), KV.of(leftNullValue, rightValue)));
+ }
+ }
+ }
+ }))
+ .setCoder(KvCoder.of(((KvCoder) leftCollection.getCoder()).getKeyCoder(),
+ KvCoder.of(((KvCoder) leftCollection.getCoder()).getValueCoder(),
+ ((KvCoder) rightCollection.getCoder()).getValueCoder())));
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c5918b2f/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/OuterFullJoinTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/OuterFullJoinTest.java b/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/OuterFullJoinTest.java
new file mode 100644
index 0000000..cdf4f4f
--- /dev/null
+++ b/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/OuterFullJoinTest.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.joinlibrary;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+/**
+ * This test Outer Full Join functionality.
+ */
+public class OuterFullJoinTest {
+
+ List<KV<String, Long>> leftListOfKv;
+ List<KV<String, String>> listRightOfKv;
+ List<KV<String, KV<Long, String>>> expectedResult;
+
+ @Rule
+ public final transient TestPipeline p = TestPipeline.create();
+
+ @Before
+ public void setup() {
+
+ leftListOfKv = new ArrayList<>();
+ listRightOfKv = new ArrayList<>();
+
+ expectedResult = new ArrayList<>();
+ }
+
+ @Test
+ public void testJoinOneToOneMapping() {
+ leftListOfKv.add(KV.of("Key1", 5L));
+ leftListOfKv.add(KV.of("Key2", 4L));
+ PCollection<KV<String, Long>> leftCollection = p
+ .apply("CreateLeft", Create.of(leftListOfKv));
+
+ listRightOfKv.add(KV.of("Key1", "foo"));
+ listRightOfKv.add(KV.of("Key2", "bar"));
+ PCollection<KV<String, String>> rightCollection = p
+ .apply("CreateRight", Create.of(listRightOfKv));
+
+ PCollection<KV<String, KV<Long, String>>> output = Join.fullOuterJoin(
+ leftCollection, rightCollection, -1L, "");
+
+ expectedResult.add(KV.of("Key1", KV.of(5L, "foo")));
+ expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
+ PAssert.that(output).containsInAnyOrder(expectedResult);
+
+ p.run();
+ }
+
+ @Test
+ public void testJoinOneToManyMapping() {
+ leftListOfKv.add(KV.of("Key2", 4L));
+ PCollection<KV<String, Long>> leftCollection = p
+ .apply("CreateLeft", Create.of(leftListOfKv));
+
+ listRightOfKv.add(KV.of("Key2", "bar"));
+ listRightOfKv.add(KV.of("Key2", "gazonk"));
+ PCollection<KV<String, String>> rightCollection = p
+ .apply("CreateRight", Create.of(listRightOfKv));
+
+ PCollection<KV<String, KV<Long, String>>> output = Join.fullOuterJoin(
+ leftCollection, rightCollection, -1L, "");
+
+ expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
+ expectedResult.add(KV.of("Key2", KV.of(4L, "gazonk")));
+ PAssert.that(output).containsInAnyOrder(expectedResult);
+
+ p.run();
+ }
+
+ @Test
+ public void testJoinManyToOneMapping() {
+ leftListOfKv.add(KV.of("Key2", 4L));
+ leftListOfKv.add(KV.of("Key2", 6L));
+ PCollection<KV<String, Long>> leftCollection = p
+ .apply("CreateLeft", Create.of(leftListOfKv));
+
+ listRightOfKv.add(KV.of("Key2", "bar"));
+ PCollection<KV<String, String>> rightCollection = p
+ .apply("CreateRight", Create.of(listRightOfKv));
+
+ PCollection<KV<String, KV<Long, String>>> output = Join.fullOuterJoin(
+ leftCollection, rightCollection, -1L, "");
+
+ expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
+ expectedResult.add(KV.of("Key2", KV.of(6L, "bar")));
+ PAssert.that(output).containsInAnyOrder(expectedResult);
+
+ p.run();
+ }
+
+ @Test
+ public void testJoinNoneToNoneMapping() {
+ leftListOfKv.add(KV.of("Key2", 4L));
+ PCollection<KV<String, Long>> leftCollection = p
+ .apply("CreateLeft", Create.of(leftListOfKv));
+
+ listRightOfKv.add(KV.of("Key3", "bar"));
+ PCollection<KV<String, String>> rightCollection = p
+ .apply("CreateRight", Create.of(listRightOfKv));
+
+ PCollection<KV<String, KV<Long, String>>> output = Join.fullOuterJoin(
+ leftCollection, rightCollection, -1L, "");
+
+ expectedResult.add(KV.of("Key2", KV.of(4L, "")));
+ expectedResult.add(KV.of("Key3", KV.of(-1L, "bar")));
+ PAssert.that(output).containsInAnyOrder(expectedResult);
+ p.run();
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void testJoinLeftCollectionNull() {
+ p.enableAbandonedNodeEnforcement(false);
+ Join.fullOuterJoin(
+ null,
+ p.apply(
+ Create.of(listRightOfKv)
+ .withCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))),
+ "", "");
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void testJoinRightCollectionNull() {
+ p.enableAbandonedNodeEnforcement(false);
+ Join.fullOuterJoin(
+ p.apply(
+ Create.of(leftListOfKv).withCoder(KvCoder.of(StringUtf8Coder.of(), VarLongCoder.of()))),
+ null,
+ -1L, -1L);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void testJoinLeftNullValueIsNull() {
+ p.enableAbandonedNodeEnforcement(false);
+ Join.fullOuterJoin(
+ p.apply("CreateLeft", Create.empty(KvCoder.of(StringUtf8Coder.of(), VarLongCoder.of()))),
+ p.apply(
+ "CreateRight", Create.empty(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))),
+ null, "");
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void testJoinRightNullValueIsNull() {
+ p.enableAbandonedNodeEnforcement(false);
+ Join.fullOuterJoin(
+ p.apply("CreateLeft", Create.empty(KvCoder.of(StringUtf8Coder.of(), VarLongCoder.of()))),
+ p.apply(
+ "CreateRight", Create.empty(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))),
+ -1L, null);
+ }
+}
[2/2] beam git commit: This closes #3267
Posted by da...@apache.org.
This closes #3267
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f1386c1c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f1386c1c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f1386c1c
Branch: refs/heads/master
Commit: f1386c1cb2bc39b571ee6e94acc1612d22ae69b6
Parents: 2f9428c c5918b2
Author: Davor Bonaci <da...@google.com>
Authored: Fri Jun 2 10:01:00 2017 -0700
Committer: Davor Bonaci <da...@google.com>
Committed: Fri Jun 2 10:01:00 2017 -0700
----------------------------------------------------------------------
.../beam/sdk/extensions/joinlibrary/Join.java | 65 ++++++-
.../joinlibrary/OuterFullJoinTest.java | 179 +++++++++++++++++++
2 files changed, 243 insertions(+), 1 deletion(-)
----------------------------------------------------------------------