You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2017/06/02 17:01:11 UTC

[1/2] beam git commit: [BEAM-2378] support FULL OUTER JOIN

Repository: beam
Updated Branches:
  refs/heads/master 2f9428c3e -> f1386c1cb


[BEAM-2378] support FULL OUTER JOIN


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c5918b2f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c5918b2f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c5918b2f

Branch: refs/heads/master
Commit: c5918b2f7ce36c755e2a285c42cc6b628b9ee319
Parents: 2f9428c
Author: James Xu <xu...@gmail.com>
Authored: Wed May 31 10:28:55 2017 +0800
Committer: Davor Bonaci <da...@google.com>
Committed: Fri Jun 2 10:00:57 2017 -0700

----------------------------------------------------------------------
 .../beam/sdk/extensions/joinlibrary/Join.java   |  65 ++++++-
 .../joinlibrary/OuterFullJoinTest.java          | 179 +++++++++++++++++++
 2 files changed, 243 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/c5918b2f/sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java b/sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java
index f4e6ccb..9acb048 100644
--- a/sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java
+++ b/sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java
@@ -141,7 +141,7 @@ public class Join {
    * @param <V1> Type of the values for the left collection.
    * @param <V2> Type of the values for the right collection.
    * @return A joined collection of KV where Key is the key and value is a
-   *         KV where Key is of type V1 and Value is type V2. Keys that
+   *         KV where Key is of type V1 and Value is type V2. Values that
    *         should be null or empty is replaced with nullValue.
    */
   public static <K, V1, V2> PCollection<KV<K, KV<V1, V2>>> rightOuterJoin(
@@ -184,4 +184,67 @@ public class Join {
                            KvCoder.of(((KvCoder) leftCollection.getCoder()).getValueCoder(),
                                       ((KvCoder) rightCollection.getCoder()).getValueCoder())));
   }
+
+  /**
+   * Full Outer Join of two collections of KV elements.
+   * @param leftCollection Left side collection to join.
+   * @param rightCollection Right side collection to join.
+   * @param leftNullValue Value to use as null value when left side do not match right side.
+   * @param rightNullValue Value to use as null value when right side do not match right side.
+   * @param <K> Type of the key for both collections
+   * @param <V1> Type of the values for the left collection.
+   * @param <V2> Type of the values for the right collection.
+   * @return A joined collection of KV where Key is the key and value is a
+   *         KV where Key is of type V1 and Value is type V2. Values that
+   *         should be null or empty is replaced with leftNullValue/rightNullValue.
+   */
+  public static <K, V1, V2> PCollection<KV<K, KV<V1, V2>>> fullOuterJoin(
+      final PCollection<KV<K, V1>> leftCollection,
+      final PCollection<KV<K, V2>> rightCollection,
+      final V1 leftNullValue, final V2 rightNullValue) {
+    checkNotNull(leftCollection);
+    checkNotNull(rightCollection);
+    checkNotNull(leftNullValue);
+    checkNotNull(rightNullValue);
+
+    final TupleTag<V1> v1Tuple = new TupleTag<>();
+    final TupleTag<V2> v2Tuple = new TupleTag<>();
+
+    PCollection<KV<K, CoGbkResult>> coGbkResultCollection =
+        KeyedPCollectionTuple.of(v1Tuple, leftCollection)
+            .and(v2Tuple, rightCollection)
+            .apply(CoGroupByKey.<K>create());
+
+    return coGbkResultCollection.apply(ParDo.of(
+        new DoFn<KV<K, CoGbkResult>, KV<K, KV<V1, V2>>>() {
+          @ProcessElement
+          public void processElement(ProcessContext c) {
+            KV<K, CoGbkResult> e = c.element();
+
+            Iterable<V1> leftValuesIterable = e.getValue().getAll(v1Tuple);
+            Iterable<V2> rightValuesIterable = e.getValue().getAll(v2Tuple);
+            if (leftValuesIterable.iterator().hasNext()
+                && rightValuesIterable.iterator().hasNext()) {
+              for (V2 rightValue : rightValuesIterable) {
+                for (V1 leftValue : leftValuesIterable) {
+                  c.output(KV.of(e.getKey(), KV.of(leftValue, rightValue)));
+                }
+              }
+            } else if (leftValuesIterable.iterator().hasNext()
+                && !rightValuesIterable.iterator().hasNext()) {
+              for (V1 leftValue : leftValuesIterable) {
+                c.output(KV.of(e.getKey(), KV.of(leftValue, rightNullValue)));
+              }
+            } else if (!leftValuesIterable.iterator().hasNext()
+                && rightValuesIterable.iterator().hasNext()) {
+              for (V2 rightValue : rightValuesIterable) {
+                c.output(KV.of(e.getKey(), KV.of(leftNullValue, rightValue)));
+              }
+            }
+          }
+        }))
+        .setCoder(KvCoder.of(((KvCoder) leftCollection.getCoder()).getKeyCoder(),
+            KvCoder.of(((KvCoder) leftCollection.getCoder()).getValueCoder(),
+                ((KvCoder) rightCollection.getCoder()).getValueCoder())));
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c5918b2f/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/OuterFullJoinTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/OuterFullJoinTest.java b/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/OuterFullJoinTest.java
new file mode 100644
index 0000000..cdf4f4f
--- /dev/null
+++ b/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/OuterFullJoinTest.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.joinlibrary;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+/**
+ * This test Outer Full Join functionality.
+ */
+public class OuterFullJoinTest {
+
+  List<KV<String, Long>> leftListOfKv;
+  List<KV<String, String>> listRightOfKv;
+  List<KV<String, KV<Long, String>>> expectedResult;
+
+  @Rule
+  public final transient TestPipeline p = TestPipeline.create();
+
+  @Before
+  public void setup() {
+
+    leftListOfKv = new ArrayList<>();
+    listRightOfKv = new ArrayList<>();
+
+    expectedResult = new ArrayList<>();
+  }
+
+  @Test
+  public void testJoinOneToOneMapping() {
+    leftListOfKv.add(KV.of("Key1", 5L));
+    leftListOfKv.add(KV.of("Key2", 4L));
+    PCollection<KV<String, Long>> leftCollection = p
+        .apply("CreateLeft", Create.of(leftListOfKv));
+
+    listRightOfKv.add(KV.of("Key1", "foo"));
+    listRightOfKv.add(KV.of("Key2", "bar"));
+    PCollection<KV<String, String>> rightCollection = p
+        .apply("CreateRight", Create.of(listRightOfKv));
+
+    PCollection<KV<String, KV<Long, String>>> output = Join.fullOuterJoin(
+      leftCollection, rightCollection, -1L, "");
+
+    expectedResult.add(KV.of("Key1", KV.of(5L, "foo")));
+    expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
+    PAssert.that(output).containsInAnyOrder(expectedResult);
+
+    p.run();
+  }
+
+  @Test
+  public void testJoinOneToManyMapping() {
+    leftListOfKv.add(KV.of("Key2", 4L));
+    PCollection<KV<String, Long>> leftCollection = p
+        .apply("CreateLeft", Create.of(leftListOfKv));
+
+    listRightOfKv.add(KV.of("Key2", "bar"));
+    listRightOfKv.add(KV.of("Key2", "gazonk"));
+    PCollection<KV<String, String>> rightCollection = p
+        .apply("CreateRight", Create.of(listRightOfKv));
+
+    PCollection<KV<String, KV<Long, String>>> output = Join.fullOuterJoin(
+      leftCollection, rightCollection, -1L, "");
+
+    expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
+    expectedResult.add(KV.of("Key2", KV.of(4L, "gazonk")));
+    PAssert.that(output).containsInAnyOrder(expectedResult);
+
+    p.run();
+  }
+
+  @Test
+  public void testJoinManyToOneMapping() {
+    leftListOfKv.add(KV.of("Key2", 4L));
+    leftListOfKv.add(KV.of("Key2", 6L));
+    PCollection<KV<String, Long>> leftCollection = p
+        .apply("CreateLeft", Create.of(leftListOfKv));
+
+    listRightOfKv.add(KV.of("Key2", "bar"));
+    PCollection<KV<String, String>> rightCollection = p
+        .apply("CreateRight", Create.of(listRightOfKv));
+
+    PCollection<KV<String, KV<Long, String>>> output = Join.fullOuterJoin(
+      leftCollection, rightCollection, -1L, "");
+
+    expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
+    expectedResult.add(KV.of("Key2", KV.of(6L, "bar")));
+    PAssert.that(output).containsInAnyOrder(expectedResult);
+
+    p.run();
+  }
+
+  @Test
+  public void testJoinNoneToNoneMapping() {
+    leftListOfKv.add(KV.of("Key2", 4L));
+    PCollection<KV<String, Long>> leftCollection = p
+        .apply("CreateLeft", Create.of(leftListOfKv));
+
+    listRightOfKv.add(KV.of("Key3", "bar"));
+    PCollection<KV<String, String>> rightCollection = p
+        .apply("CreateRight", Create.of(listRightOfKv));
+
+    PCollection<KV<String, KV<Long, String>>> output = Join.fullOuterJoin(
+      leftCollection, rightCollection, -1L, "");
+
+    expectedResult.add(KV.of("Key2", KV.of(4L, "")));
+    expectedResult.add(KV.of("Key3", KV.of(-1L, "bar")));
+    PAssert.that(output).containsInAnyOrder(expectedResult);
+    p.run();
+  }
+
+  @Test(expected = NullPointerException.class)
+  public void testJoinLeftCollectionNull() {
+    p.enableAbandonedNodeEnforcement(false);
+    Join.fullOuterJoin(
+        null,
+        p.apply(
+            Create.of(listRightOfKv)
+                .withCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))),
+        "", "");
+  }
+
+  @Test(expected = NullPointerException.class)
+  public void testJoinRightCollectionNull() {
+    p.enableAbandonedNodeEnforcement(false);
+    Join.fullOuterJoin(
+        p.apply(
+            Create.of(leftListOfKv).withCoder(KvCoder.of(StringUtf8Coder.of(), VarLongCoder.of()))),
+        null,
+        -1L, -1L);
+  }
+
+  @Test(expected = NullPointerException.class)
+  public void testJoinLeftNullValueIsNull() {
+    p.enableAbandonedNodeEnforcement(false);
+    Join.fullOuterJoin(
+        p.apply("CreateLeft", Create.empty(KvCoder.of(StringUtf8Coder.of(), VarLongCoder.of()))),
+        p.apply(
+            "CreateRight", Create.empty(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))),
+        null, "");
+  }
+
+  @Test(expected = NullPointerException.class)
+  public void testJoinRightNullValueIsNull() {
+    p.enableAbandonedNodeEnforcement(false);
+    Join.fullOuterJoin(
+        p.apply("CreateLeft", Create.empty(KvCoder.of(StringUtf8Coder.of(), VarLongCoder.of()))),
+        p.apply(
+            "CreateRight", Create.empty(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))),
+        -1L, null);
+  }
+}


[2/2] beam git commit: This closes #3267

Posted by da...@apache.org.
This closes #3267


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f1386c1c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f1386c1c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f1386c1c

Branch: refs/heads/master
Commit: f1386c1cb2bc39b571ee6e94acc1612d22ae69b6
Parents: 2f9428c c5918b2
Author: Davor Bonaci <da...@google.com>
Authored: Fri Jun 2 10:01:00 2017 -0700
Committer: Davor Bonaci <da...@google.com>
Committed: Fri Jun 2 10:01:00 2017 -0700

----------------------------------------------------------------------
 .../beam/sdk/extensions/joinlibrary/Join.java   |  65 ++++++-
 .../joinlibrary/OuterFullJoinTest.java          | 179 +++++++++++++++++++
 2 files changed, 243 insertions(+), 1 deletion(-)
----------------------------------------------------------------------