You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bc...@apache.org on 2016/04/26 02:50:22 UTC
[1/3] incubator-beam git commit: Reorganize Java packages
Repository: incubator-beam
Updated Branches:
refs/heads/master d5814a38d -> 0f7b81615
Reorganize Java packages
Move org.apache.contrib.joinlibrary to org.apache.beam.sdk.extensions.joinlibrary.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e4059412
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e4059412
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e4059412
Branch: refs/heads/master
Commit: e4059412d111a223ee7cfda0004a62699b38f41f
Parents: e2ca889
Author: Davor Bonaci <da...@google.com>
Authored: Mon Apr 25 16:45:59 2016 -0700
Committer: bchambers <bc...@google.com>
Committed: Mon Apr 25 17:32:23 2016 -0700
----------------------------------------------------------------------
.../beam/sdk/extensions/joinlibrary/Join.java | 186 +++++++++++++++++++
.../org/apache/contrib/joinlibrary/Join.java | 186 -------------------
.../extensions/joinlibrary/InnerJoinTest.java | 143 ++++++++++++++
.../joinlibrary/OuterLeftJoinTest.java | 153 +++++++++++++++
.../joinlibrary/OuterRightJoinTest.java | 153 +++++++++++++++
.../contrib/joinlibrary/InnerJoinTest.java | 143 --------------
.../contrib/joinlibrary/OuterLeftJoinTest.java | 153 ---------------
.../contrib/joinlibrary/OuterRightJoinTest.java | 153 ---------------
8 files changed, 635 insertions(+), 635 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4059412/sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java b/sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java
new file mode 100644
index 0000000..968a613
--- /dev/null
+++ b/sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.joinlibrary;
+
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.join.CoGbkResult;
+import org.apache.beam.sdk.transforms.join.CoGroupByKey;
+import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TupleTag;
+import com.google.common.base.Preconditions;
+
+/**
+ * Utility class with different versions of joins. All methods join two collections of
+ * key/value pairs (KV).
+ */
+public class Join {
+
+ /**
+ * Inner join of two collections of KV elements.
+ * @param leftCollection Left side collection to join.
+ * @param rightCollection Right side collection to join.
+ * @param <K> Type of the key for both collections
+ * @param <V1> Type of the values for the left collection.
+ * @param <V2> Type of the values for the right collection.
+ * @return A joined collection of KV where Key is the key and value is a
+ * KV where Key is of type V1 and Value is type V2.
+ */
+ public static <K, V1, V2> PCollection<KV<K, KV<V1, V2>>> innerJoin(
+ final PCollection<KV<K, V1>> leftCollection, final PCollection<KV<K, V2>> rightCollection) {
+ Preconditions.checkNotNull(leftCollection);
+ Preconditions.checkNotNull(rightCollection);
+
+ final TupleTag<V1> v1Tuple = new TupleTag<>();
+ final TupleTag<V2> v2Tuple = new TupleTag<>();
+
+ PCollection<KV<K, CoGbkResult>> coGbkResultCollection =
+ KeyedPCollectionTuple.of(v1Tuple, leftCollection)
+ .and(v2Tuple, rightCollection)
+ .apply(CoGroupByKey.<K>create());
+
+ return coGbkResultCollection.apply(ParDo.of(
+ new DoFn<KV<K, CoGbkResult>, KV<K, KV<V1, V2>>>() {
+ @Override
+ public void processElement(ProcessContext c) {
+ KV<K, CoGbkResult> e = c.element();
+
+ Iterable<V1> leftValuesIterable = e.getValue().getAll(v1Tuple);
+ Iterable<V2> rightValuesIterable = e.getValue().getAll(v2Tuple);
+
+ for (V1 leftValue : leftValuesIterable) {
+ for (V2 rightValue : rightValuesIterable) {
+ c.output(KV.of(e.getKey(), KV.of(leftValue, rightValue)));
+ }
+ }
+ }
+ }))
+ .setCoder(KvCoder.of(((KvCoder) leftCollection.getCoder()).getKeyCoder(),
+ KvCoder.of(((KvCoder) leftCollection.getCoder()).getValueCoder(),
+ ((KvCoder) rightCollection.getCoder()).getValueCoder())));
+ }
+
+ /**
+ * Left Outer Join of two collections of KV elements.
+ * @param leftCollection Left side collection to join.
+ * @param rightCollection Right side collection to join.
+ * @param nullValue Value to use as null value when right side do not match left side.
+ * @param <K> Type of the key for both collections
+ * @param <V1> Type of the values for the left collection.
+ * @param <V2> Type of the values for the right collection.
+ * @return A joined collection of KV where Key is the key and value is a
+ * KV where Key is of type V1 and Value is type V2. Values that
+ * should be null or empty is replaced with nullValue.
+ */
+ public static <K, V1, V2> PCollection<KV<K, KV<V1, V2>>> leftOuterJoin(
+ final PCollection<KV<K, V1>> leftCollection,
+ final PCollection<KV<K, V2>> rightCollection,
+ final V2 nullValue) {
+ Preconditions.checkNotNull(leftCollection);
+ Preconditions.checkNotNull(rightCollection);
+ Preconditions.checkNotNull(nullValue);
+
+ final TupleTag<V1> v1Tuple = new TupleTag<>();
+ final TupleTag<V2> v2Tuple = new TupleTag<>();
+
+ PCollection<KV<K, CoGbkResult>> coGbkResultCollection =
+ KeyedPCollectionTuple.of(v1Tuple, leftCollection)
+ .and(v2Tuple, rightCollection)
+ .apply(CoGroupByKey.<K>create());
+
+ return coGbkResultCollection.apply(ParDo.of(
+ new DoFn<KV<K, CoGbkResult>, KV<K, KV<V1, V2>>>() {
+ @Override
+ public void processElement(ProcessContext c) {
+ KV<K, CoGbkResult> e = c.element();
+
+ Iterable<V1> leftValuesIterable = e.getValue().getAll(v1Tuple);
+ Iterable<V2> rightValuesIterable = e.getValue().getAll(v2Tuple);
+
+ for (V1 leftValue : leftValuesIterable) {
+ if (rightValuesIterable.iterator().hasNext()) {
+ for (V2 rightValue : rightValuesIterable) {
+ c.output(KV.of(e.getKey(), KV.of(leftValue, rightValue)));
+ }
+ } else {
+ c.output(KV.of(e.getKey(), KV.of(leftValue, nullValue)));
+ }
+ }
+ }
+ }))
+ .setCoder(KvCoder.of(((KvCoder) leftCollection.getCoder()).getKeyCoder(),
+ KvCoder.of(((KvCoder) leftCollection.getCoder()).getValueCoder(),
+ ((KvCoder) rightCollection.getCoder()).getValueCoder())));
+ }
+
+ /**
+ * Right Outer Join of two collections of KV elements.
+ * @param leftCollection Left side collection to join.
+ * @param rightCollection Right side collection to join.
+ * @param nullValue Value to use as null value when left side do not match right side.
+ * @param <K> Type of the key for both collections
+ * @param <V1> Type of the values for the left collection.
+ * @param <V2> Type of the values for the right collection.
+ * @return A joined collection of KV where Key is the key and value is a
+ * KV where Key is of type V1 and Value is type V2. Keys that
+ * should be null or empty is replaced with nullValue.
+ */
+ public static <K, V1, V2> PCollection<KV<K, KV<V1, V2>>> rightOuterJoin(
+ final PCollection<KV<K, V1>> leftCollection,
+ final PCollection<KV<K, V2>> rightCollection,
+ final V1 nullValue) {
+ Preconditions.checkNotNull(leftCollection);
+ Preconditions.checkNotNull(rightCollection);
+ Preconditions.checkNotNull(nullValue);
+
+ final TupleTag<V1> v1Tuple = new TupleTag<>();
+ final TupleTag<V2> v2Tuple = new TupleTag<>();
+
+ PCollection<KV<K, CoGbkResult>> coGbkResultCollection =
+ KeyedPCollectionTuple.of(v1Tuple, leftCollection)
+ .and(v2Tuple, rightCollection)
+ .apply(CoGroupByKey.<K>create());
+
+ return coGbkResultCollection.apply(ParDo.of(
+ new DoFn<KV<K, CoGbkResult>, KV<K, KV<V1, V2>>>() {
+ @Override
+ public void processElement(ProcessContext c) {
+ KV<K, CoGbkResult> e = c.element();
+
+ Iterable<V1> leftValuesIterable = e.getValue().getAll(v1Tuple);
+ Iterable<V2> rightValuesIterable = e.getValue().getAll(v2Tuple);
+
+ for (V2 rightValue : rightValuesIterable) {
+ if (leftValuesIterable.iterator().hasNext()) {
+ for (V1 leftValue : leftValuesIterable) {
+ c.output(KV.of(e.getKey(), KV.of(leftValue, rightValue)));
+ }
+ } else {
+ c.output(KV.of(e.getKey(), KV.of(nullValue, rightValue)));
+ }
+ }
+ }
+ }))
+ .setCoder(KvCoder.of(((KvCoder) leftCollection.getCoder()).getKeyCoder(),
+ KvCoder.of(((KvCoder) leftCollection.getCoder()).getValueCoder(),
+ ((KvCoder) rightCollection.getCoder()).getValueCoder())));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4059412/sdks/java/extensions/join-library/src/main/java/org/apache/contrib/joinlibrary/Join.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/join-library/src/main/java/org/apache/contrib/joinlibrary/Join.java b/sdks/java/extensions/join-library/src/main/java/org/apache/contrib/joinlibrary/Join.java
deleted file mode 100644
index 6421e97..0000000
--- a/sdks/java/extensions/join-library/src/main/java/org/apache/contrib/joinlibrary/Join.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.contrib.joinlibrary;
-
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.join.CoGbkResult;
-import org.apache.beam.sdk.transforms.join.CoGroupByKey;
-import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.TupleTag;
-import com.google.common.base.Preconditions;
-
-/**
- * Utility class with different versions of joins. All methods join two collections of
- * key/value pairs (KV).
- */
-public class Join {
-
- /**
- * Inner join of two collections of KV elements.
- * @param leftCollection Left side collection to join.
- * @param rightCollection Right side collection to join.
- * @param <K> Type of the key for both collections
- * @param <V1> Type of the values for the left collection.
- * @param <V2> Type of the values for the right collection.
- * @return A joined collection of KV where Key is the key and value is a
- * KV where Key is of type V1 and Value is type V2.
- */
- public static <K, V1, V2> PCollection<KV<K, KV<V1, V2>>> innerJoin(
- final PCollection<KV<K, V1>> leftCollection, final PCollection<KV<K, V2>> rightCollection) {
- Preconditions.checkNotNull(leftCollection);
- Preconditions.checkNotNull(rightCollection);
-
- final TupleTag<V1> v1Tuple = new TupleTag<>();
- final TupleTag<V2> v2Tuple = new TupleTag<>();
-
- PCollection<KV<K, CoGbkResult>> coGbkResultCollection =
- KeyedPCollectionTuple.of(v1Tuple, leftCollection)
- .and(v2Tuple, rightCollection)
- .apply(CoGroupByKey.<K>create());
-
- return coGbkResultCollection.apply(ParDo.of(
- new DoFn<KV<K, CoGbkResult>, KV<K, KV<V1, V2>>>() {
- @Override
- public void processElement(ProcessContext c) {
- KV<K, CoGbkResult> e = c.element();
-
- Iterable<V1> leftValuesIterable = e.getValue().getAll(v1Tuple);
- Iterable<V2> rightValuesIterable = e.getValue().getAll(v2Tuple);
-
- for (V1 leftValue : leftValuesIterable) {
- for (V2 rightValue : rightValuesIterable) {
- c.output(KV.of(e.getKey(), KV.of(leftValue, rightValue)));
- }
- }
- }
- }))
- .setCoder(KvCoder.of(((KvCoder) leftCollection.getCoder()).getKeyCoder(),
- KvCoder.of(((KvCoder) leftCollection.getCoder()).getValueCoder(),
- ((KvCoder) rightCollection.getCoder()).getValueCoder())));
- }
-
- /**
- * Left Outer Join of two collections of KV elements.
- * @param leftCollection Left side collection to join.
- * @param rightCollection Right side collection to join.
- * @param nullValue Value to use as null value when right side do not match left side.
- * @param <K> Type of the key for both collections
- * @param <V1> Type of the values for the left collection.
- * @param <V2> Type of the values for the right collection.
- * @return A joined collection of KV where Key is the key and value is a
- * KV where Key is of type V1 and Value is type V2. Values that
- * should be null or empty is replaced with nullValue.
- */
- public static <K, V1, V2> PCollection<KV<K, KV<V1, V2>>> leftOuterJoin(
- final PCollection<KV<K, V1>> leftCollection,
- final PCollection<KV<K, V2>> rightCollection,
- final V2 nullValue) {
- Preconditions.checkNotNull(leftCollection);
- Preconditions.checkNotNull(rightCollection);
- Preconditions.checkNotNull(nullValue);
-
- final TupleTag<V1> v1Tuple = new TupleTag<>();
- final TupleTag<V2> v2Tuple = new TupleTag<>();
-
- PCollection<KV<K, CoGbkResult>> coGbkResultCollection =
- KeyedPCollectionTuple.of(v1Tuple, leftCollection)
- .and(v2Tuple, rightCollection)
- .apply(CoGroupByKey.<K>create());
-
- return coGbkResultCollection.apply(ParDo.of(
- new DoFn<KV<K, CoGbkResult>, KV<K, KV<V1, V2>>>() {
- @Override
- public void processElement(ProcessContext c) {
- KV<K, CoGbkResult> e = c.element();
-
- Iterable<V1> leftValuesIterable = e.getValue().getAll(v1Tuple);
- Iterable<V2> rightValuesIterable = e.getValue().getAll(v2Tuple);
-
- for (V1 leftValue : leftValuesIterable) {
- if (rightValuesIterable.iterator().hasNext()) {
- for (V2 rightValue : rightValuesIterable) {
- c.output(KV.of(e.getKey(), KV.of(leftValue, rightValue)));
- }
- } else {
- c.output(KV.of(e.getKey(), KV.of(leftValue, nullValue)));
- }
- }
- }
- }))
- .setCoder(KvCoder.of(((KvCoder) leftCollection.getCoder()).getKeyCoder(),
- KvCoder.of(((KvCoder) leftCollection.getCoder()).getValueCoder(),
- ((KvCoder) rightCollection.getCoder()).getValueCoder())));
- }
-
- /**
- * Right Outer Join of two collections of KV elements.
- * @param leftCollection Left side collection to join.
- * @param rightCollection Right side collection to join.
- * @param nullValue Value to use as null value when left side do not match right side.
- * @param <K> Type of the key for both collections
- * @param <V1> Type of the values for the left collection.
- * @param <V2> Type of the values for the right collection.
- * @return A joined collection of KV where Key is the key and value is a
- * KV where Key is of type V1 and Value is type V2. Keys that
- * should be null or empty is replaced with nullValue.
- */
- public static <K, V1, V2> PCollection<KV<K, KV<V1, V2>>> rightOuterJoin(
- final PCollection<KV<K, V1>> leftCollection,
- final PCollection<KV<K, V2>> rightCollection,
- final V1 nullValue) {
- Preconditions.checkNotNull(leftCollection);
- Preconditions.checkNotNull(rightCollection);
- Preconditions.checkNotNull(nullValue);
-
- final TupleTag<V1> v1Tuple = new TupleTag<>();
- final TupleTag<V2> v2Tuple = new TupleTag<>();
-
- PCollection<KV<K, CoGbkResult>> coGbkResultCollection =
- KeyedPCollectionTuple.of(v1Tuple, leftCollection)
- .and(v2Tuple, rightCollection)
- .apply(CoGroupByKey.<K>create());
-
- return coGbkResultCollection.apply(ParDo.of(
- new DoFn<KV<K, CoGbkResult>, KV<K, KV<V1, V2>>>() {
- @Override
- public void processElement(ProcessContext c) {
- KV<K, CoGbkResult> e = c.element();
-
- Iterable<V1> leftValuesIterable = e.getValue().getAll(v1Tuple);
- Iterable<V2> rightValuesIterable = e.getValue().getAll(v2Tuple);
-
- for (V2 rightValue : rightValuesIterable) {
- if (leftValuesIterable.iterator().hasNext()) {
- for (V1 leftValue : leftValuesIterable) {
- c.output(KV.of(e.getKey(), KV.of(leftValue, rightValue)));
- }
- } else {
- c.output(KV.of(e.getKey(), KV.of(nullValue, rightValue)));
- }
- }
- }
- }))
- .setCoder(KvCoder.of(((KvCoder) leftCollection.getCoder()).getKeyCoder(),
- KvCoder.of(((KvCoder) leftCollection.getCoder()).getValueCoder(),
- ((KvCoder) rightCollection.getCoder()).getValueCoder())));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4059412/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/InnerJoinTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/InnerJoinTest.java b/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/InnerJoinTest.java
new file mode 100644
index 0000000..6622fdc
--- /dev/null
+++ b/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/InnerJoinTest.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.joinlibrary;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * This test Inner Join functionality.
+ */
+public class InnerJoinTest {
+
+ Pipeline p;
+ List<KV<String, Long>> leftListOfKv;
+ List<KV<String, String>> listRightOfKv;
+ List<KV<String, KV<Long, String>>> expectedResult;
+
+ @Before
+ public void setup() {
+
+ p = TestPipeline.create();
+ leftListOfKv = new ArrayList<>();
+ listRightOfKv = new ArrayList<>();
+
+ expectedResult = new ArrayList<>();
+ }
+
+ @Test
+ public void testJoinOneToOneMapping() {
+ leftListOfKv.add(KV.of("Key1", 5L));
+ leftListOfKv.add(KV.of("Key2", 4L));
+ PCollection<KV<String, Long>> leftCollection =
+ p.apply("CreateLeft", Create.of(leftListOfKv));
+
+ listRightOfKv.add(KV.of("Key1", "foo"));
+ listRightOfKv.add(KV.of("Key2", "bar"));
+ PCollection<KV<String, String>> rightCollection =
+ p.apply("CreateRight", Create.of(listRightOfKv));
+
+ PCollection<KV<String, KV<Long, String>>> output = Join.innerJoin(
+ leftCollection, rightCollection);
+
+ expectedResult.add(KV.of("Key1", KV.of(5L, "foo")));
+ expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
+ PAssert.that(output).containsInAnyOrder(expectedResult);
+
+ p.run();
+ }
+
+ @Test
+ public void testJoinOneToManyMapping() {
+ leftListOfKv.add(KV.of("Key2", 4L));
+ PCollection<KV<String, Long>> leftCollection = p
+ .apply("CreateLeft", Create.of(leftListOfKv));
+
+ listRightOfKv.add(KV.of("Key2", "bar"));
+ listRightOfKv.add(KV.of("Key2", "gazonk"));
+ PCollection<KV<String, String>> rightCollection = p
+ .apply("CreateRight", Create.of(listRightOfKv));
+
+ PCollection<KV<String, KV<Long, String>>> output = Join.innerJoin(
+ leftCollection, rightCollection);
+
+ expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
+ expectedResult.add(KV.of("Key2", KV.of(4L, "gazonk")));
+ PAssert.that(output).containsInAnyOrder(expectedResult);
+
+ p.run();
+ }
+
+ @Test
+ public void testJoinManyToOneMapping() {
+ leftListOfKv.add(KV.of("Key2", 4L));
+ leftListOfKv.add(KV.of("Key2", 6L));
+ PCollection<KV<String, Long>> leftCollection = p
+ .apply("CreateLeft", Create.of(leftListOfKv));
+
+ listRightOfKv.add(KV.of("Key2", "bar"));
+ PCollection<KV<String, String>> rightCollection = p
+ .apply("CreateRight", Create.of(listRightOfKv));
+
+ PCollection<KV<String, KV<Long, String>>> output = Join.innerJoin(
+ leftCollection, rightCollection);
+
+ expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
+ expectedResult.add(KV.of("Key2", KV.of(6L, "bar")));
+ PAssert.that(output).containsInAnyOrder(expectedResult);
+
+ p.run();
+ }
+
+ @Test
+ public void testJoinNoneToNoneMapping() {
+ leftListOfKv.add(KV.of("Key2", 4L));
+ PCollection<KV<String, Long>> leftCollection = p
+ .apply("CreateLeft", Create.of(leftListOfKv));
+
+ listRightOfKv.add(KV.of("Key3", "bar"));
+ PCollection<KV<String, String>> rightCollection = p
+ .apply("CreateRight", Create.of(listRightOfKv));
+
+ PCollection<KV<String, KV<Long, String>>> output = Join.innerJoin(
+ leftCollection, rightCollection);
+
+ PAssert.that(output).containsInAnyOrder(expectedResult);
+ p.run();
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void testJoinLeftCollectionNull() {
+ Join.innerJoin(null, p.apply(Create.of(listRightOfKv)));
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void testJoinRightCollectionNull() {
+ Join.innerJoin(p.apply(Create.of(leftListOfKv)), null);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4059412/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/OuterLeftJoinTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/OuterLeftJoinTest.java b/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/OuterLeftJoinTest.java
new file mode 100644
index 0000000..91b0740
--- /dev/null
+++ b/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/OuterLeftJoinTest.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.joinlibrary;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+
+/**
+ * This test Outer Left Join functionality.
+ */
+public class OuterLeftJoinTest {
+
+ Pipeline p;
+ List<KV<String, Long>> leftListOfKv;
+ List<KV<String, String>> listRightOfKv;
+ List<KV<String, KV<Long, String>>> expectedResult;
+
+ @Before
+ public void setup() {
+
+ p = TestPipeline.create();
+ leftListOfKv = new ArrayList<>();
+ listRightOfKv = new ArrayList<>();
+
+ expectedResult = new ArrayList<>();
+ }
+
+ @Test
+ public void testJoinOneToOneMapping() {
+ leftListOfKv.add(KV.of("Key1", 5L));
+ leftListOfKv.add(KV.of("Key2", 4L));
+ PCollection<KV<String, Long>> leftCollection = p
+ .apply("CreateLeft", Create.of(leftListOfKv));
+
+ listRightOfKv.add(KV.of("Key1", "foo"));
+ listRightOfKv.add(KV.of("Key2", "bar"));
+ PCollection<KV<String, String>> rightCollection = p
+ .apply("CreateRight", Create.of(listRightOfKv));
+
+ PCollection<KV<String, KV<Long, String>>> output = Join.leftOuterJoin(
+ leftCollection, rightCollection, "");
+
+ expectedResult.add(KV.of("Key1", KV.of(5L, "foo")));
+ expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
+ PAssert.that(output).containsInAnyOrder(expectedResult);
+
+ p.run();
+ }
+
+ @Test
+ public void testJoinOneToManyMapping() {
+ leftListOfKv.add(KV.of("Key2", 4L));
+ PCollection<KV<String, Long>> leftCollection = p
+ .apply("CreateLeft", Create.of(leftListOfKv));
+
+ listRightOfKv.add(KV.of("Key2", "bar"));
+ listRightOfKv.add(KV.of("Key2", "gazonk"));
+ PCollection<KV<String, String>> rightCollection = p
+ .apply("CreateRight", Create.of(listRightOfKv));
+
+ PCollection<KV<String, KV<Long, String>>> output = Join.leftOuterJoin(
+ leftCollection, rightCollection, "");
+
+ expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
+ expectedResult.add(KV.of("Key2", KV.of(4L, "gazonk")));
+ PAssert.that(output).containsInAnyOrder(expectedResult);
+
+ p.run();
+ }
+
+ @Test
+ public void testJoinManyToOneMapping() {
+ leftListOfKv.add(KV.of("Key2", 4L));
+ leftListOfKv.add(KV.of("Key2", 6L));
+ PCollection<KV<String, Long>> leftCollection = p
+ .apply("CreateLeft", Create.of(leftListOfKv));
+
+ listRightOfKv.add(KV.of("Key2", "bar"));
+ PCollection<KV<String, String>> rightCollection = p
+ .apply("CreateRight", Create.of(listRightOfKv));
+
+ PCollection<KV<String, KV<Long, String>>> output = Join.leftOuterJoin(
+ leftCollection, rightCollection, "");
+
+ expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
+ expectedResult.add(KV.of("Key2", KV.of(6L, "bar")));
+ PAssert.that(output).containsInAnyOrder(expectedResult);
+
+ p.run();
+ }
+
+ @Test
+ public void testJoinOneToNoneMapping() {
+ leftListOfKv.add(KV.of("Key2", 4L));
+ PCollection<KV<String, Long>> leftCollection = p
+ .apply("CreateLeft", Create.of(leftListOfKv));
+
+ listRightOfKv.add(KV.of("Key3", "bar"));
+ PCollection<KV<String, String>> rightCollection = p
+ .apply("CreateRight", Create.of(listRightOfKv));
+
+ PCollection<KV<String, KV<Long, String>>> output = Join.leftOuterJoin(
+ leftCollection, rightCollection, "");
+
+ expectedResult.add(KV.of("Key2", KV.of(4L, "")));
+ PAssert.that(output).containsInAnyOrder(expectedResult);
+ p.run();
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void testJoinLeftCollectionNull() {
+ Join.leftOuterJoin(null, p.apply(Create.of(listRightOfKv)), "");
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void testJoinRightCollectionNull() {
+ Join.leftOuterJoin(p.apply(Create.of(leftListOfKv)), null, "");
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void testJoinNullValueIsNull() {
+ Join.leftOuterJoin(
+ p.apply("CreateLeft", Create.of(leftListOfKv)),
+ p.apply("CreateRight", Create.of(listRightOfKv)),
+ null);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4059412/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/OuterRightJoinTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/OuterRightJoinTest.java b/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/OuterRightJoinTest.java
new file mode 100644
index 0000000..7977df7
--- /dev/null
+++ b/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/OuterRightJoinTest.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.joinlibrary;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+
+/**
+ * This test Outer Right Join functionality.
+ */
+public class OuterRightJoinTest {
+
+ Pipeline p;
+ List<KV<String, Long>> leftListOfKv;
+ List<KV<String, String>> listRightOfKv;
+ List<KV<String, KV<Long, String>>> expectedResult;
+
+ @Before
+ public void setup() {
+
+ p = TestPipeline.create();
+ leftListOfKv = new ArrayList<>();
+ listRightOfKv = new ArrayList<>();
+
+ expectedResult = new ArrayList<>();
+ }
+
+ @Test
+ public void testJoinOneToOneMapping() {
+ leftListOfKv.add(KV.of("Key1", 5L));
+ leftListOfKv.add(KV.of("Key2", 4L));
+ PCollection<KV<String, Long>> leftCollection = p
+ .apply("CreateLeft", Create.of(leftListOfKv));
+
+ listRightOfKv.add(KV.of("Key1", "foo"));
+ listRightOfKv.add(KV.of("Key2", "bar"));
+ PCollection<KV<String, String>> rightCollection = p
+ .apply("CreateRight", Create.of(listRightOfKv));
+
+ PCollection<KV<String, KV<Long, String>>> output = Join.rightOuterJoin(
+ leftCollection, rightCollection, -1L);
+
+ expectedResult.add(KV.of("Key1", KV.of(5L, "foo")));
+ expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
+ PAssert.that(output).containsInAnyOrder(expectedResult);
+
+ p.run();
+ }
+
+ @Test
+ public void testJoinOneToManyMapping() {
+ leftListOfKv.add(KV.of("Key2", 4L));
+ PCollection<KV<String, Long>> leftCollection = p
+ .apply("CreateLeft", Create.of(leftListOfKv));
+
+ listRightOfKv.add(KV.of("Key2", "bar"));
+ listRightOfKv.add(KV.of("Key2", "gazonk"));
+ PCollection<KV<String, String>> rightCollection = p
+ .apply("CreateRight", Create.of(listRightOfKv));
+
+ PCollection<KV<String, KV<Long, String>>> output = Join.rightOuterJoin(
+ leftCollection, rightCollection, -1L);
+
+ expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
+ expectedResult.add(KV.of("Key2", KV.of(4L, "gazonk")));
+ PAssert.that(output).containsInAnyOrder(expectedResult);
+
+ p.run();
+ }
+
+ @Test
+ public void testJoinManyToOneMapping() {
+ leftListOfKv.add(KV.of("Key2", 4L));
+ leftListOfKv.add(KV.of("Key2", 6L));
+ PCollection<KV<String, Long>> leftCollection = p
+ .apply("CreateLeft", Create.of(leftListOfKv));
+
+ listRightOfKv.add(KV.of("Key2", "bar"));
+ PCollection<KV<String, String>> rightCollection = p
+ .apply("CreateRight", Create.of(listRightOfKv));
+
+ PCollection<KV<String, KV<Long, String>>> output = Join.rightOuterJoin(
+ leftCollection, rightCollection, -1L);
+
+ expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
+ expectedResult.add(KV.of("Key2", KV.of(6L, "bar")));
+ PAssert.that(output).containsInAnyOrder(expectedResult);
+
+ p.run();
+ }
+
+ @Test
+ public void testJoinNoneToOneMapping() {
+ leftListOfKv.add(KV.of("Key2", 4L));
+ PCollection<KV<String, Long>> leftCollection = p
+ .apply("CreateLeft", Create.of(leftListOfKv));
+
+ listRightOfKv.add(KV.of("Key3", "bar"));
+ PCollection<KV<String, String>> rightCollection = p
+ .apply("CreateRight", Create.of(listRightOfKv));
+
+ PCollection<KV<String, KV<Long, String>>> output = Join.rightOuterJoin(
+ leftCollection, rightCollection, -1L);
+
+ expectedResult.add(KV.of("Key3", KV.of(-1L, "bar")));
+ PAssert.that(output).containsInAnyOrder(expectedResult);
+ p.run();
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void testJoinLeftCollectionNull() {
+ Join.rightOuterJoin(null, p.apply(Create.of(listRightOfKv)), "");
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void testJoinRightCollectionNull() {
+ Join.rightOuterJoin(p.apply(Create.of(leftListOfKv)), null, -1L);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void testJoinNullValueIsNull() {
+ Join.rightOuterJoin(
+ p.apply("CreateLeft", Create.of(leftListOfKv)),
+ p.apply("CreateRight", Create.of(listRightOfKv)),
+ null);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4059412/sdks/java/extensions/join-library/src/test/java/org/apache/contrib/joinlibrary/InnerJoinTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/join-library/src/test/java/org/apache/contrib/joinlibrary/InnerJoinTest.java b/sdks/java/extensions/join-library/src/test/java/org/apache/contrib/joinlibrary/InnerJoinTest.java
deleted file mode 100644
index 99e9c4b..0000000
--- a/sdks/java/extensions/join-library/src/test/java/org/apache/contrib/joinlibrary/InnerJoinTest.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.contrib.joinlibrary;
-
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * This test Inner Join functionality.
- */
-public class InnerJoinTest {
-
- Pipeline p;
- List<KV<String, Long>> leftListOfKv;
- List<KV<String, String>> listRightOfKv;
- List<KV<String, KV<Long, String>>> expectedResult;
-
- @Before
- public void setup() {
-
- p = TestPipeline.create();
- leftListOfKv = new ArrayList<>();
- listRightOfKv = new ArrayList<>();
-
- expectedResult = new ArrayList<>();
- }
-
- @Test
- public void testJoinOneToOneMapping() {
- leftListOfKv.add(KV.of("Key1", 5L));
- leftListOfKv.add(KV.of("Key2", 4L));
- PCollection<KV<String, Long>> leftCollection =
- p.apply("CreateLeft", Create.of(leftListOfKv));
-
- listRightOfKv.add(KV.of("Key1", "foo"));
- listRightOfKv.add(KV.of("Key2", "bar"));
- PCollection<KV<String, String>> rightCollection =
- p.apply("CreateRight", Create.of(listRightOfKv));
-
- PCollection<KV<String, KV<Long, String>>> output = Join.innerJoin(
- leftCollection, rightCollection);
-
- expectedResult.add(KV.of("Key1", KV.of(5L, "foo")));
- expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
- PAssert.that(output).containsInAnyOrder(expectedResult);
-
- p.run();
- }
-
- @Test
- public void testJoinOneToManyMapping() {
- leftListOfKv.add(KV.of("Key2", 4L));
- PCollection<KV<String, Long>> leftCollection = p
- .apply("CreateLeft", Create.of(leftListOfKv));
-
- listRightOfKv.add(KV.of("Key2", "bar"));
- listRightOfKv.add(KV.of("Key2", "gazonk"));
- PCollection<KV<String, String>> rightCollection = p
- .apply("CreateRight", Create.of(listRightOfKv));
-
- PCollection<KV<String, KV<Long, String>>> output = Join.innerJoin(
- leftCollection, rightCollection);
-
- expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
- expectedResult.add(KV.of("Key2", KV.of(4L, "gazonk")));
- PAssert.that(output).containsInAnyOrder(expectedResult);
-
- p.run();
- }
-
- @Test
- public void testJoinManyToOneMapping() {
- leftListOfKv.add(KV.of("Key2", 4L));
- leftListOfKv.add(KV.of("Key2", 6L));
- PCollection<KV<String, Long>> leftCollection = p
- .apply("CreateLeft", Create.of(leftListOfKv));
-
- listRightOfKv.add(KV.of("Key2", "bar"));
- PCollection<KV<String, String>> rightCollection = p
- .apply("CreateRight", Create.of(listRightOfKv));
-
- PCollection<KV<String, KV<Long, String>>> output = Join.innerJoin(
- leftCollection, rightCollection);
-
- expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
- expectedResult.add(KV.of("Key2", KV.of(6L, "bar")));
- PAssert.that(output).containsInAnyOrder(expectedResult);
-
- p.run();
- }
-
- @Test
- public void testJoinNoneToNoneMapping() {
- leftListOfKv.add(KV.of("Key2", 4L));
- PCollection<KV<String, Long>> leftCollection = p
- .apply("CreateLeft", Create.of(leftListOfKv));
-
- listRightOfKv.add(KV.of("Key3", "bar"));
- PCollection<KV<String, String>> rightCollection = p
- .apply("CreateRight", Create.of(listRightOfKv));
-
- PCollection<KV<String, KV<Long, String>>> output = Join.innerJoin(
- leftCollection, rightCollection);
-
- PAssert.that(output).containsInAnyOrder(expectedResult);
- p.run();
- }
-
- @Test(expected = NullPointerException.class)
- public void testJoinLeftCollectionNull() {
- Join.innerJoin(null, p.apply(Create.of(listRightOfKv)));
- }
-
- @Test(expected = NullPointerException.class)
- public void testJoinRightCollectionNull() {
- Join.innerJoin(p.apply(Create.of(leftListOfKv)), null);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4059412/sdks/java/extensions/join-library/src/test/java/org/apache/contrib/joinlibrary/OuterLeftJoinTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/join-library/src/test/java/org/apache/contrib/joinlibrary/OuterLeftJoinTest.java b/sdks/java/extensions/join-library/src/test/java/org/apache/contrib/joinlibrary/OuterLeftJoinTest.java
deleted file mode 100644
index ca09136..0000000
--- a/sdks/java/extensions/join-library/src/test/java/org/apache/contrib/joinlibrary/OuterLeftJoinTest.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.contrib.joinlibrary;
-
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.List;
-
-
-/**
- * This test Outer Left Join functionality.
- */
-public class OuterLeftJoinTest {
-
- Pipeline p;
- List<KV<String, Long>> leftListOfKv;
- List<KV<String, String>> listRightOfKv;
- List<KV<String, KV<Long, String>>> expectedResult;
-
- @Before
- public void setup() {
-
- p = TestPipeline.create();
- leftListOfKv = new ArrayList<>();
- listRightOfKv = new ArrayList<>();
-
- expectedResult = new ArrayList<>();
- }
-
- @Test
- public void testJoinOneToOneMapping() {
- leftListOfKv.add(KV.of("Key1", 5L));
- leftListOfKv.add(KV.of("Key2", 4L));
- PCollection<KV<String, Long>> leftCollection = p
- .apply("CreateLeft", Create.of(leftListOfKv));
-
- listRightOfKv.add(KV.of("Key1", "foo"));
- listRightOfKv.add(KV.of("Key2", "bar"));
- PCollection<KV<String, String>> rightCollection = p
- .apply("CreateRight", Create.of(listRightOfKv));
-
- PCollection<KV<String, KV<Long, String>>> output = Join.leftOuterJoin(
- leftCollection, rightCollection, "");
-
- expectedResult.add(KV.of("Key1", KV.of(5L, "foo")));
- expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
- PAssert.that(output).containsInAnyOrder(expectedResult);
-
- p.run();
- }
-
- @Test
- public void testJoinOneToManyMapping() {
- leftListOfKv.add(KV.of("Key2", 4L));
- PCollection<KV<String, Long>> leftCollection = p
- .apply("CreateLeft", Create.of(leftListOfKv));
-
- listRightOfKv.add(KV.of("Key2", "bar"));
- listRightOfKv.add(KV.of("Key2", "gazonk"));
- PCollection<KV<String, String>> rightCollection = p
- .apply("CreateRight", Create.of(listRightOfKv));
-
- PCollection<KV<String, KV<Long, String>>> output = Join.leftOuterJoin(
- leftCollection, rightCollection, "");
-
- expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
- expectedResult.add(KV.of("Key2", KV.of(4L, "gazonk")));
- PAssert.that(output).containsInAnyOrder(expectedResult);
-
- p.run();
- }
-
- @Test
- public void testJoinManyToOneMapping() {
- leftListOfKv.add(KV.of("Key2", 4L));
- leftListOfKv.add(KV.of("Key2", 6L));
- PCollection<KV<String, Long>> leftCollection = p
- .apply("CreateLeft", Create.of(leftListOfKv));
-
- listRightOfKv.add(KV.of("Key2", "bar"));
- PCollection<KV<String, String>> rightCollection = p
- .apply("CreateRight", Create.of(listRightOfKv));
-
- PCollection<KV<String, KV<Long, String>>> output = Join.leftOuterJoin(
- leftCollection, rightCollection, "");
-
- expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
- expectedResult.add(KV.of("Key2", KV.of(6L, "bar")));
- PAssert.that(output).containsInAnyOrder(expectedResult);
-
- p.run();
- }
-
- @Test
- public void testJoinOneToNoneMapping() {
- leftListOfKv.add(KV.of("Key2", 4L));
- PCollection<KV<String, Long>> leftCollection = p
- .apply("CreateLeft", Create.of(leftListOfKv));
-
- listRightOfKv.add(KV.of("Key3", "bar"));
- PCollection<KV<String, String>> rightCollection = p
- .apply("CreateRight", Create.of(listRightOfKv));
-
- PCollection<KV<String, KV<Long, String>>> output = Join.leftOuterJoin(
- leftCollection, rightCollection, "");
-
- expectedResult.add(KV.of("Key2", KV.of(4L, "")));
- PAssert.that(output).containsInAnyOrder(expectedResult);
- p.run();
- }
-
- @Test(expected = NullPointerException.class)
- public void testJoinLeftCollectionNull() {
- Join.leftOuterJoin(null, p.apply(Create.of(listRightOfKv)), "");
- }
-
- @Test(expected = NullPointerException.class)
- public void testJoinRightCollectionNull() {
- Join.leftOuterJoin(p.apply(Create.of(leftListOfKv)), null, "");
- }
-
- @Test(expected = NullPointerException.class)
- public void testJoinNullValueIsNull() {
- Join.leftOuterJoin(
- p.apply("CreateLeft", Create.of(leftListOfKv)),
- p.apply("CreateRight", Create.of(listRightOfKv)),
- null);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4059412/sdks/java/extensions/join-library/src/test/java/org/apache/contrib/joinlibrary/OuterRightJoinTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/join-library/src/test/java/org/apache/contrib/joinlibrary/OuterRightJoinTest.java b/sdks/java/extensions/join-library/src/test/java/org/apache/contrib/joinlibrary/OuterRightJoinTest.java
deleted file mode 100644
index 86028ac..0000000
--- a/sdks/java/extensions/join-library/src/test/java/org/apache/contrib/joinlibrary/OuterRightJoinTest.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.contrib.joinlibrary;
-
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.List;
-
-
-/**
- * This test Outer Right Join functionality.
- */
-public class OuterRightJoinTest {
-
- Pipeline p;
- List<KV<String, Long>> leftListOfKv;
- List<KV<String, String>> listRightOfKv;
- List<KV<String, KV<Long, String>>> expectedResult;
-
- @Before
- public void setup() {
-
- p = TestPipeline.create();
- leftListOfKv = new ArrayList<>();
- listRightOfKv = new ArrayList<>();
-
- expectedResult = new ArrayList<>();
- }
-
- @Test
- public void testJoinOneToOneMapping() {
- leftListOfKv.add(KV.of("Key1", 5L));
- leftListOfKv.add(KV.of("Key2", 4L));
- PCollection<KV<String, Long>> leftCollection = p
- .apply("CreateLeft", Create.of(leftListOfKv));
-
- listRightOfKv.add(KV.of("Key1", "foo"));
- listRightOfKv.add(KV.of("Key2", "bar"));
- PCollection<KV<String, String>> rightCollection = p
- .apply("CreateRight", Create.of(listRightOfKv));
-
- PCollection<KV<String, KV<Long, String>>> output = Join.rightOuterJoin(
- leftCollection, rightCollection, -1L);
-
- expectedResult.add(KV.of("Key1", KV.of(5L, "foo")));
- expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
- PAssert.that(output).containsInAnyOrder(expectedResult);
-
- p.run();
- }
-
- @Test
- public void testJoinOneToManyMapping() {
- leftListOfKv.add(KV.of("Key2", 4L));
- PCollection<KV<String, Long>> leftCollection = p
- .apply("CreateLeft", Create.of(leftListOfKv));
-
- listRightOfKv.add(KV.of("Key2", "bar"));
- listRightOfKv.add(KV.of("Key2", "gazonk"));
- PCollection<KV<String, String>> rightCollection = p
- .apply("CreateRight", Create.of(listRightOfKv));
-
- PCollection<KV<String, KV<Long, String>>> output = Join.rightOuterJoin(
- leftCollection, rightCollection, -1L);
-
- expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
- expectedResult.add(KV.of("Key2", KV.of(4L, "gazonk")));
- PAssert.that(output).containsInAnyOrder(expectedResult);
-
- p.run();
- }
-
- @Test
- public void testJoinManyToOneMapping() {
- leftListOfKv.add(KV.of("Key2", 4L));
- leftListOfKv.add(KV.of("Key2", 6L));
- PCollection<KV<String, Long>> leftCollection = p
- .apply("CreateLeft", Create.of(leftListOfKv));
-
- listRightOfKv.add(KV.of("Key2", "bar"));
- PCollection<KV<String, String>> rightCollection = p
- .apply("CreateRight", Create.of(listRightOfKv));
-
- PCollection<KV<String, KV<Long, String>>> output = Join.rightOuterJoin(
- leftCollection, rightCollection, -1L);
-
- expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
- expectedResult.add(KV.of("Key2", KV.of(6L, "bar")));
- PAssert.that(output).containsInAnyOrder(expectedResult);
-
- p.run();
- }
-
- @Test
- public void testJoinNoneToOneMapping() {
- leftListOfKv.add(KV.of("Key2", 4L));
- PCollection<KV<String, Long>> leftCollection = p
- .apply("CreateLeft", Create.of(leftListOfKv));
-
- listRightOfKv.add(KV.of("Key3", "bar"));
- PCollection<KV<String, String>> rightCollection = p
- .apply("CreateRight", Create.of(listRightOfKv));
-
- PCollection<KV<String, KV<Long, String>>> output = Join.rightOuterJoin(
- leftCollection, rightCollection, -1L);
-
- expectedResult.add(KV.of("Key3", KV.of(-1L, "bar")));
- PAssert.that(output).containsInAnyOrder(expectedResult);
- p.run();
- }
-
- @Test(expected = NullPointerException.class)
- public void testJoinLeftCollectionNull() {
- Join.rightOuterJoin(null, p.apply(Create.of(listRightOfKv)), "");
- }
-
- @Test(expected = NullPointerException.class)
- public void testJoinRightCollectionNull() {
- Join.rightOuterJoin(p.apply(Create.of(leftListOfKv)), null, -1L);
- }
-
- @Test(expected = NullPointerException.class)
- public void testJoinNullValueIsNull() {
- Join.rightOuterJoin(
- p.apply("CreateLeft", Create.of(leftListOfKv)),
- p.apply("CreateRight", Create.of(listRightOfKv)),
- null);
- }
-}
[3/3] incubator-beam git commit: This closes #240
Posted by bc...@apache.org.
This closes #240
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0f7b8161
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0f7b8161
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0f7b8161
Branch: refs/heads/master
Commit: 0f7b8161546c0e160cb1722275a698c15f5aafc1
Parents: d5814a3 e405941
Author: bchambers <bc...@google.com>
Authored: Mon Apr 25 17:32:30 2016 -0700
Committer: bchambers <bc...@google.com>
Committed: Mon Apr 25 17:32:30 2016 -0700
----------------------------------------------------------------------
contrib/README.md | 53 ------
contrib/hadoop/AUTHORS.md | 7 -
contrib/join-library/AUTHORS.md | 6 -
contrib/join-library/README.md | 33 ----
contrib/join-library/pom.xml | 185 ------------------
.../org/apache/contrib/joinlibrary/Join.java | 186 -------------------
.../contrib/joinlibrary/InnerJoinTest.java | 143 --------------
.../contrib/joinlibrary/OuterLeftJoinTest.java | 153 ---------------
.../contrib/joinlibrary/OuterRightJoinTest.java | 153 ---------------
sdks/java/extensions/join-library/README.md | 33 ++++
sdks/java/extensions/join-library/pom.xml | 111 +++++++++++
.../beam/sdk/extensions/joinlibrary/Join.java | 186 +++++++++++++++++++
.../extensions/joinlibrary/InnerJoinTest.java | 143 ++++++++++++++
.../joinlibrary/OuterLeftJoinTest.java | 153 +++++++++++++++
.../joinlibrary/OuterRightJoinTest.java | 153 +++++++++++++++
sdks/java/extensions/pom.xml | 40 ++++
sdks/java/pom.xml | 1 +
17 files changed, 820 insertions(+), 919 deletions(-)
----------------------------------------------------------------------
[2/3] incubator-beam git commit: Move contrib/join-library to
sdks/java/extensions.
Posted by bc...@apache.org.
Move contrib/join-library to sdks/java/extensions.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e2ca8890
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e2ca8890
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e2ca8890
Branch: refs/heads/master
Commit: e2ca88908097e58c4a4e022a86de7a23d9412850
Parents: d5814a3
Author: Davor Bonaci <da...@google.com>
Authored: Mon Apr 25 16:42:31 2016 -0700
Committer: bchambers <bc...@google.com>
Committed: Mon Apr 25 17:32:23 2016 -0700
----------------------------------------------------------------------
contrib/README.md | 53 ------
contrib/hadoop/AUTHORS.md | 7 -
contrib/join-library/AUTHORS.md | 6 -
contrib/join-library/README.md | 33 ----
contrib/join-library/pom.xml | 185 ------------------
.../org/apache/contrib/joinlibrary/Join.java | 186 -------------------
.../contrib/joinlibrary/InnerJoinTest.java | 143 --------------
.../contrib/joinlibrary/OuterLeftJoinTest.java | 153 ---------------
.../contrib/joinlibrary/OuterRightJoinTest.java | 153 ---------------
sdks/java/extensions/join-library/README.md | 33 ++++
sdks/java/extensions/join-library/pom.xml | 111 +++++++++++
.../org/apache/contrib/joinlibrary/Join.java | 186 +++++++++++++++++++
.../contrib/joinlibrary/InnerJoinTest.java | 143 ++++++++++++++
.../contrib/joinlibrary/OuterLeftJoinTest.java | 153 +++++++++++++++
.../contrib/joinlibrary/OuterRightJoinTest.java | 153 +++++++++++++++
sdks/java/extensions/pom.xml | 40 ++++
sdks/java/pom.xml | 1 +
17 files changed, 820 insertions(+), 919 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e2ca8890/contrib/README.md
----------------------------------------------------------------------
diff --git a/contrib/README.md b/contrib/README.md
deleted file mode 100644
index b99cf46..0000000
--- a/contrib/README.md
+++ /dev/null
@@ -1,53 +0,0 @@
-# Community contributions
-
-This directory hosts a wide variety of community contributions that may be
-useful to other users of
-[Google Cloud Dataflow](https://cloud.google.com/dataflow/),
-but may not be appropriate or ready yet for inclusion into the
-[mainline SDK](https://github.com/GoogleCloudPlatform/DataflowJavaSDK/) or a
-separate Google-maintained artifact.
-
-## Organization
-
-Each subdirectory represents a logically separate and independent module.
-Preferably, the code is hosted directly in this repository. When appropriate, we
-are also open to linking external repositories via
-[`submodule`](http://git-scm.com/docs/git-submodule/) functionality within Git.
-
-While we are happy to host individual modules to provide additional value to all
-Cloud Dataflow users, the modules are _maintained solely by their respective
-authors_. We will make sure that modules are related to Cloud Dataflow, that
-they are distributed under the same license as the mainline SDK, and provide
-some guidance to the authors to make the quality as high as possible.
-
-We __cannot__, however, provide _any_ guarantees about correctness,
-compatibility, performance, support, test coverage, maintenance or future
-availability of individual modules hosted here.
-
-## Process
-
-In general, we recommend to get in touch with us through the issue tracker
-first. That way we can help out and possibly guide you. Coordinating up front
-makes it much easier to avoid frustration later on.
-
-We welcome pull requests with a new module from everyone. Every module must be
-related to Cloud Dataflow and must have an informative README.md file. We will
-provide general guidance, but usually won't be reviewing the module in detail.
-We reserve the right to refuse acceptance to any module, or remove it at any
-time in the future.
-
-We also welcome improvements to an existing module from everyone. We'll often
-wait for comments from the primary author of the module before merging a pull
-request from a non-primary author.
-
-As the module matures, we may choose to pull it directly into the mainline SDK
-or promote it to a Google-managed artifact.
-
-## Licensing
-
-We require all contributors to sign the Contributor License Agreement, exactly
-as we require for any contributions to the mainline SDK. More information is
-available in our [CONTRIBUTING.md](https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/CONTRIBUTING.md)
-file.
-
-_Thank you for your contribution to the Cloud Dataflow community!_
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e2ca8890/contrib/hadoop/AUTHORS.md
----------------------------------------------------------------------
diff --git a/contrib/hadoop/AUTHORS.md b/contrib/hadoop/AUTHORS.md
deleted file mode 100644
index 6effdb9..0000000
--- a/contrib/hadoop/AUTHORS.md
+++ /dev/null
@@ -1,7 +0,0 @@
-# Authors of 'hadoop' module
-
-The following is the official list of authors for copyright purposes of this community-contributed module.
-
- Cloudera
- Tom White, tom [at] cloudera [dot] com
- Google Inc.
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e2ca8890/contrib/join-library/AUTHORS.md
----------------------------------------------------------------------
diff --git a/contrib/join-library/AUTHORS.md b/contrib/join-library/AUTHORS.md
deleted file mode 100644
index d32b6a7..0000000
--- a/contrib/join-library/AUTHORS.md
+++ /dev/null
@@ -1,6 +0,0 @@
-# Authors of join-library
-
-The following is the official list of authors for copyright purposes of this community-contributed module.
-
- Google Inc.
- Magnus Runesson, M.Runesson [at] gmail [dot] com
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e2ca8890/contrib/join-library/README.md
----------------------------------------------------------------------
diff --git a/contrib/join-library/README.md b/contrib/join-library/README.md
deleted file mode 100644
index 8e2a011..0000000
--- a/contrib/join-library/README.md
+++ /dev/null
@@ -1,33 +0,0 @@
-Join-library
-============
-
-Join-library provides inner join, outer left and right join functions to
-Google Cloud Dataflow. The aim is to simplify the most common cases of join to a
-simple function call.
-
-The functions are generic so it supports join of any types supported by
-Dataflow. Input to the join functions are PCollections of Key/Values. Both the
-left and right PCollections need the same type for the key. All the join
-functions return a Key/Value where Key is the join key and value is
-a Key/Value where the key is the left value and right is the value.
-
-In the cases of outer join, since null cannot be serialized the user have
-to provide a value that represent null for that particular use case.
-
-Example how to use join-library:
-
- PCollection<KV<String, String>> leftPcollection = ...
- PCollection<KV<String, Long>> rightPcollection = ...
-
- PCollection<KV<String, KV<String, Long>>> joinedPcollection =
- Join.innerJoin(leftPcollection, rightPcollection);
-
-Join-library can be found on maven-central:
-
- <dependency>
- <groupId>org.linuxalert.dataflow</groupId>
- <artifactId>google-cloud-dataflow-java-contrib-joinlibrary</artifactId>
- <version>0.0.3</version>
- </dependency>
-
-Questions or comments: `M.Runesson [at] gmail [dot] com`
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e2ca8890/contrib/join-library/pom.xml
----------------------------------------------------------------------
diff --git a/contrib/join-library/pom.xml b/contrib/join-library/pom.xml
deleted file mode 100644
index 090f445..0000000
--- a/contrib/join-library/pom.xml
+++ /dev/null
@@ -1,185 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-
- <modelVersion>4.0.0</modelVersion>
-
- <groupId>org.linuxalert.dataflow</groupId>
- <artifactId>google-cloud-dataflow-java-contrib-joinlibrary</artifactId>
- <name>Google Cloud Dataflow Join Library</name>
- <description>Library with generic join functions for Dataflow.</description>
- <url>https://github.com/GoogleCloudPlatform/DataflowJavaSDK/tree/master/contrib/join-library</url>
- <developers>
- <developer>
- <organization>Google Inc.</organization>
- <organizationUrl>http://www.google.com</organizationUrl>
- </developer>
- <developer>
- <name>Magnus Runesson</name>
- <email>M (dot) Runesson (at) gmail (dot) com</email>
- <roles>
- <role>Developer</role>
- </roles>
- <timezone>+1</timezone>
- </developer>
- </developers>
- <contributors>
- <contributor>
- <name>Magnus Runesson</name>
- <email>M (dot) Runesson (at) gmail (dot) com</email>
- <url>https://github.com/mrunesson</url>
- </contributor>
- </contributors>
- <version>0.0.4</version>
- <packaging>jar</packaging>
-
- <licenses>
- <license>
- <name>Apache License, Version 2.0</name>
- <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
- <distribution>repo</distribution>
- </license>
- </licenses>
-
- <scm>
- <connection>scm:git:git@github.com:GoogleCloudPlatform/DataflowJavaSDK.git</connection>
- <developerConnection>scm:git:git@github.com:GoogleCloudPlatform/DataflowJavaSDK.git</developerConnection>
- <url>https://github.com/GoogleCloudPlatform/DataflowJavaSDK/tree/master/contrib/join-library</url>
- </scm>
-
- <properties>
- <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <beam-version>[0.1.0, 1.0.0)</beam-version>
- </properties>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <version>3.2</version>
- <configuration>
- <source>1.7</source>
- <target>1.7</target>
- </configuration>
- </plugin>
-
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-checkstyle-plugin</artifactId>
- <version>2.17</version>
- <configuration>
- <configLocation>../../sdks/java/checkstyle.xml</configLocation>
- <consoleOutput>true</consoleOutput>
- <failOnViolation>true</failOnViolation>
- <includeTestSourceDirectory>true</includeTestSourceDirectory>
- </configuration>
- <executions>
- <execution>
- <phase>validate</phase>
- <goals>
- <goal>check</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
-
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-source-plugin</artifactId>
- <version>2.4</version>
- <executions>
- <execution>
- <id>attach-sources</id>
- <goals>
- <goal>jar</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
-
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-javadoc-plugin</artifactId>
- <version>2.10.3</version>
- <executions>
- <execution>
- <id>attach-javadocs</id>
- <goals>
- <goal>jar</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
-
- <plugin>
- <groupId>org.sonatype.plugins</groupId>
- <artifactId>nexus-staging-maven-plugin</artifactId>
- <version>1.6.3</version>
- <extensions>true</extensions>
- <configuration>
- <serverId>ossrh</serverId>
- <nexusUrl>https://oss.sonatype.org/</nexusUrl>
- <autoReleaseAfterClose>true</autoReleaseAfterClose>
- </configuration>
- </plugin>
-
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-gpg-plugin</artifactId>
- <version>1.5</version>
- <executions>
- <execution>
- <id>sign-artifacts</id>
- <phase>verify</phase>
- <goals>
- <goal>sign</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
-
- <distributionManagement>
- <snapshotRepository>
- <id>ossrh</id>
- <url>https://oss.sonatype.org/content/repositories/snapshots</url>
- </snapshotRepository>
- <repository>
- <id>ossrh</id>
- <url>https://oss.sonatype.org/service/local/staging/deploy/maven2/</url>
- </repository>
- </distributionManagement>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.beam</groupId>
- <artifactId>java-sdk-all</artifactId>
- <version>${beam-version}</version>
- </dependency>
-
- <dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- <version>19.0</version>
- </dependency>
-
- <!-- Dependency for tests -->
- <dependency>
- <groupId>org.hamcrest</groupId>
- <artifactId>hamcrest-all</artifactId>
- <version>1.3</version>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <version>4.12</version>
- <scope>test</scope>
- </dependency>
- </dependencies>
-
-</project>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e2ca8890/contrib/join-library/src/main/java/org/apache/contrib/joinlibrary/Join.java
----------------------------------------------------------------------
diff --git a/contrib/join-library/src/main/java/org/apache/contrib/joinlibrary/Join.java b/contrib/join-library/src/main/java/org/apache/contrib/joinlibrary/Join.java
deleted file mode 100644
index 6421e97..0000000
--- a/contrib/join-library/src/main/java/org/apache/contrib/joinlibrary/Join.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.contrib.joinlibrary;
-
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.join.CoGbkResult;
-import org.apache.beam.sdk.transforms.join.CoGroupByKey;
-import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.TupleTag;
-import com.google.common.base.Preconditions;
-
-/**
- * Utility class with different versions of joins. All methods join two collections of
- * key/value pairs (KV).
- */
-public class Join {
-
- /**
- * Inner join of two collections of KV elements.
- * @param leftCollection Left side collection to join.
- * @param rightCollection Right side collection to join.
- * @param <K> Type of the key for both collections
- * @param <V1> Type of the values for the left collection.
- * @param <V2> Type of the values for the right collection.
- * @return A joined collection of KV where Key is the key and value is a
- * KV where Key is of type V1 and Value is type V2.
- */
- public static <K, V1, V2> PCollection<KV<K, KV<V1, V2>>> innerJoin(
- final PCollection<KV<K, V1>> leftCollection, final PCollection<KV<K, V2>> rightCollection) {
- Preconditions.checkNotNull(leftCollection);
- Preconditions.checkNotNull(rightCollection);
-
- final TupleTag<V1> v1Tuple = new TupleTag<>();
- final TupleTag<V2> v2Tuple = new TupleTag<>();
-
- PCollection<KV<K, CoGbkResult>> coGbkResultCollection =
- KeyedPCollectionTuple.of(v1Tuple, leftCollection)
- .and(v2Tuple, rightCollection)
- .apply(CoGroupByKey.<K>create());
-
- return coGbkResultCollection.apply(ParDo.of(
- new DoFn<KV<K, CoGbkResult>, KV<K, KV<V1, V2>>>() {
- @Override
- public void processElement(ProcessContext c) {
- KV<K, CoGbkResult> e = c.element();
-
- Iterable<V1> leftValuesIterable = e.getValue().getAll(v1Tuple);
- Iterable<V2> rightValuesIterable = e.getValue().getAll(v2Tuple);
-
- for (V1 leftValue : leftValuesIterable) {
- for (V2 rightValue : rightValuesIterable) {
- c.output(KV.of(e.getKey(), KV.of(leftValue, rightValue)));
- }
- }
- }
- }))
- .setCoder(KvCoder.of(((KvCoder) leftCollection.getCoder()).getKeyCoder(),
- KvCoder.of(((KvCoder) leftCollection.getCoder()).getValueCoder(),
- ((KvCoder) rightCollection.getCoder()).getValueCoder())));
- }
-
- /**
- * Left Outer Join of two collections of KV elements.
- * @param leftCollection Left side collection to join.
- * @param rightCollection Right side collection to join.
- * @param nullValue Value to use as null value when right side do not match left side.
- * @param <K> Type of the key for both collections
- * @param <V1> Type of the values for the left collection.
- * @param <V2> Type of the values for the right collection.
- * @return A joined collection of KV where Key is the key and value is a
- * KV where Key is of type V1 and Value is type V2. Values that
- * should be null or empty is replaced with nullValue.
- */
- public static <K, V1, V2> PCollection<KV<K, KV<V1, V2>>> leftOuterJoin(
- final PCollection<KV<K, V1>> leftCollection,
- final PCollection<KV<K, V2>> rightCollection,
- final V2 nullValue) {
- Preconditions.checkNotNull(leftCollection);
- Preconditions.checkNotNull(rightCollection);
- Preconditions.checkNotNull(nullValue);
-
- final TupleTag<V1> v1Tuple = new TupleTag<>();
- final TupleTag<V2> v2Tuple = new TupleTag<>();
-
- PCollection<KV<K, CoGbkResult>> coGbkResultCollection =
- KeyedPCollectionTuple.of(v1Tuple, leftCollection)
- .and(v2Tuple, rightCollection)
- .apply(CoGroupByKey.<K>create());
-
- return coGbkResultCollection.apply(ParDo.of(
- new DoFn<KV<K, CoGbkResult>, KV<K, KV<V1, V2>>>() {
- @Override
- public void processElement(ProcessContext c) {
- KV<K, CoGbkResult> e = c.element();
-
- Iterable<V1> leftValuesIterable = e.getValue().getAll(v1Tuple);
- Iterable<V2> rightValuesIterable = e.getValue().getAll(v2Tuple);
-
- for (V1 leftValue : leftValuesIterable) {
- if (rightValuesIterable.iterator().hasNext()) {
- for (V2 rightValue : rightValuesIterable) {
- c.output(KV.of(e.getKey(), KV.of(leftValue, rightValue)));
- }
- } else {
- c.output(KV.of(e.getKey(), KV.of(leftValue, nullValue)));
- }
- }
- }
- }))
- .setCoder(KvCoder.of(((KvCoder) leftCollection.getCoder()).getKeyCoder(),
- KvCoder.of(((KvCoder) leftCollection.getCoder()).getValueCoder(),
- ((KvCoder) rightCollection.getCoder()).getValueCoder())));
- }
-
- /**
- * Right Outer Join of two collections of KV elements.
- * @param leftCollection Left side collection to join.
- * @param rightCollection Right side collection to join.
- * @param nullValue Value to use as null value when left side do not match right side.
- * @param <K> Type of the key for both collections
- * @param <V1> Type of the values for the left collection.
- * @param <V2> Type of the values for the right collection.
- * @return A joined collection of KV where Key is the key and value is a
- * KV where Key is of type V1 and Value is type V2. Keys that
- * should be null or empty is replaced with nullValue.
- */
- public static <K, V1, V2> PCollection<KV<K, KV<V1, V2>>> rightOuterJoin(
- final PCollection<KV<K, V1>> leftCollection,
- final PCollection<KV<K, V2>> rightCollection,
- final V1 nullValue) {
- Preconditions.checkNotNull(leftCollection);
- Preconditions.checkNotNull(rightCollection);
- Preconditions.checkNotNull(nullValue);
-
- final TupleTag<V1> v1Tuple = new TupleTag<>();
- final TupleTag<V2> v2Tuple = new TupleTag<>();
-
- PCollection<KV<K, CoGbkResult>> coGbkResultCollection =
- KeyedPCollectionTuple.of(v1Tuple, leftCollection)
- .and(v2Tuple, rightCollection)
- .apply(CoGroupByKey.<K>create());
-
- return coGbkResultCollection.apply(ParDo.of(
- new DoFn<KV<K, CoGbkResult>, KV<K, KV<V1, V2>>>() {
- @Override
- public void processElement(ProcessContext c) {
- KV<K, CoGbkResult> e = c.element();
-
- Iterable<V1> leftValuesIterable = e.getValue().getAll(v1Tuple);
- Iterable<V2> rightValuesIterable = e.getValue().getAll(v2Tuple);
-
- for (V2 rightValue : rightValuesIterable) {
- if (leftValuesIterable.iterator().hasNext()) {
- for (V1 leftValue : leftValuesIterable) {
- c.output(KV.of(e.getKey(), KV.of(leftValue, rightValue)));
- }
- } else {
- c.output(KV.of(e.getKey(), KV.of(nullValue, rightValue)));
- }
- }
- }
- }))
- .setCoder(KvCoder.of(((KvCoder) leftCollection.getCoder()).getKeyCoder(),
- KvCoder.of(((KvCoder) leftCollection.getCoder()).getValueCoder(),
- ((KvCoder) rightCollection.getCoder()).getValueCoder())));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e2ca8890/contrib/join-library/src/test/java/org/apache/contrib/joinlibrary/InnerJoinTest.java
----------------------------------------------------------------------
diff --git a/contrib/join-library/src/test/java/org/apache/contrib/joinlibrary/InnerJoinTest.java b/contrib/join-library/src/test/java/org/apache/contrib/joinlibrary/InnerJoinTest.java
deleted file mode 100644
index 99e9c4b..0000000
--- a/contrib/join-library/src/test/java/org/apache/contrib/joinlibrary/InnerJoinTest.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.contrib.joinlibrary;
-
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * This test Inner Join functionality.
- */
-public class InnerJoinTest {
-
- Pipeline p;
- List<KV<String, Long>> leftListOfKv;
- List<KV<String, String>> listRightOfKv;
- List<KV<String, KV<Long, String>>> expectedResult;
-
- @Before
- public void setup() {
-
- p = TestPipeline.create();
- leftListOfKv = new ArrayList<>();
- listRightOfKv = new ArrayList<>();
-
- expectedResult = new ArrayList<>();
- }
-
- @Test
- public void testJoinOneToOneMapping() {
- leftListOfKv.add(KV.of("Key1", 5L));
- leftListOfKv.add(KV.of("Key2", 4L));
- PCollection<KV<String, Long>> leftCollection =
- p.apply("CreateLeft", Create.of(leftListOfKv));
-
- listRightOfKv.add(KV.of("Key1", "foo"));
- listRightOfKv.add(KV.of("Key2", "bar"));
- PCollection<KV<String, String>> rightCollection =
- p.apply("CreateRight", Create.of(listRightOfKv));
-
- PCollection<KV<String, KV<Long, String>>> output = Join.innerJoin(
- leftCollection, rightCollection);
-
- expectedResult.add(KV.of("Key1", KV.of(5L, "foo")));
- expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
- PAssert.that(output).containsInAnyOrder(expectedResult);
-
- p.run();
- }
-
- @Test
- public void testJoinOneToManyMapping() {
- leftListOfKv.add(KV.of("Key2", 4L));
- PCollection<KV<String, Long>> leftCollection = p
- .apply("CreateLeft", Create.of(leftListOfKv));
-
- listRightOfKv.add(KV.of("Key2", "bar"));
- listRightOfKv.add(KV.of("Key2", "gazonk"));
- PCollection<KV<String, String>> rightCollection = p
- .apply("CreateRight", Create.of(listRightOfKv));
-
- PCollection<KV<String, KV<Long, String>>> output = Join.innerJoin(
- leftCollection, rightCollection);
-
- expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
- expectedResult.add(KV.of("Key2", KV.of(4L, "gazonk")));
- PAssert.that(output).containsInAnyOrder(expectedResult);
-
- p.run();
- }
-
- @Test
- public void testJoinManyToOneMapping() {
- leftListOfKv.add(KV.of("Key2", 4L));
- leftListOfKv.add(KV.of("Key2", 6L));
- PCollection<KV<String, Long>> leftCollection = p
- .apply("CreateLeft", Create.of(leftListOfKv));
-
- listRightOfKv.add(KV.of("Key2", "bar"));
- PCollection<KV<String, String>> rightCollection = p
- .apply("CreateRight", Create.of(listRightOfKv));
-
- PCollection<KV<String, KV<Long, String>>> output = Join.innerJoin(
- leftCollection, rightCollection);
-
- expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
- expectedResult.add(KV.of("Key2", KV.of(6L, "bar")));
- PAssert.that(output).containsInAnyOrder(expectedResult);
-
- p.run();
- }
-
- @Test
- public void testJoinNoneToNoneMapping() {
- leftListOfKv.add(KV.of("Key2", 4L));
- PCollection<KV<String, Long>> leftCollection = p
- .apply("CreateLeft", Create.of(leftListOfKv));
-
- listRightOfKv.add(KV.of("Key3", "bar"));
- PCollection<KV<String, String>> rightCollection = p
- .apply("CreateRight", Create.of(listRightOfKv));
-
- PCollection<KV<String, KV<Long, String>>> output = Join.innerJoin(
- leftCollection, rightCollection);
-
- PAssert.that(output).containsInAnyOrder(expectedResult);
- p.run();
- }
-
- @Test(expected = NullPointerException.class)
- public void testJoinLeftCollectionNull() {
- Join.innerJoin(null, p.apply(Create.of(listRightOfKv)));
- }
-
- @Test(expected = NullPointerException.class)
- public void testJoinRightCollectionNull() {
- Join.innerJoin(p.apply(Create.of(leftListOfKv)), null);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e2ca8890/contrib/join-library/src/test/java/org/apache/contrib/joinlibrary/OuterLeftJoinTest.java
----------------------------------------------------------------------
diff --git a/contrib/join-library/src/test/java/org/apache/contrib/joinlibrary/OuterLeftJoinTest.java b/contrib/join-library/src/test/java/org/apache/contrib/joinlibrary/OuterLeftJoinTest.java
deleted file mode 100644
index ca09136..0000000
--- a/contrib/join-library/src/test/java/org/apache/contrib/joinlibrary/OuterLeftJoinTest.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.contrib.joinlibrary;
-
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.List;
-
-
-/**
- * This test Outer Left Join functionality.
- */
-public class OuterLeftJoinTest {
-
- Pipeline p;
- List<KV<String, Long>> leftListOfKv;
- List<KV<String, String>> listRightOfKv;
- List<KV<String, KV<Long, String>>> expectedResult;
-
- @Before
- public void setup() {
-
- p = TestPipeline.create();
- leftListOfKv = new ArrayList<>();
- listRightOfKv = new ArrayList<>();
-
- expectedResult = new ArrayList<>();
- }
-
- @Test
- public void testJoinOneToOneMapping() {
- leftListOfKv.add(KV.of("Key1", 5L));
- leftListOfKv.add(KV.of("Key2", 4L));
- PCollection<KV<String, Long>> leftCollection = p
- .apply("CreateLeft", Create.of(leftListOfKv));
-
- listRightOfKv.add(KV.of("Key1", "foo"));
- listRightOfKv.add(KV.of("Key2", "bar"));
- PCollection<KV<String, String>> rightCollection = p
- .apply("CreateRight", Create.of(listRightOfKv));
-
- PCollection<KV<String, KV<Long, String>>> output = Join.leftOuterJoin(
- leftCollection, rightCollection, "");
-
- expectedResult.add(KV.of("Key1", KV.of(5L, "foo")));
- expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
- PAssert.that(output).containsInAnyOrder(expectedResult);
-
- p.run();
- }
-
- @Test
- public void testJoinOneToManyMapping() {
- leftListOfKv.add(KV.of("Key2", 4L));
- PCollection<KV<String, Long>> leftCollection = p
- .apply("CreateLeft", Create.of(leftListOfKv));
-
- listRightOfKv.add(KV.of("Key2", "bar"));
- listRightOfKv.add(KV.of("Key2", "gazonk"));
- PCollection<KV<String, String>> rightCollection = p
- .apply("CreateRight", Create.of(listRightOfKv));
-
- PCollection<KV<String, KV<Long, String>>> output = Join.leftOuterJoin(
- leftCollection, rightCollection, "");
-
- expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
- expectedResult.add(KV.of("Key2", KV.of(4L, "gazonk")));
- PAssert.that(output).containsInAnyOrder(expectedResult);
-
- p.run();
- }
-
- @Test
- public void testJoinManyToOneMapping() {
- leftListOfKv.add(KV.of("Key2", 4L));
- leftListOfKv.add(KV.of("Key2", 6L));
- PCollection<KV<String, Long>> leftCollection = p
- .apply("CreateLeft", Create.of(leftListOfKv));
-
- listRightOfKv.add(KV.of("Key2", "bar"));
- PCollection<KV<String, String>> rightCollection = p
- .apply("CreateRight", Create.of(listRightOfKv));
-
- PCollection<KV<String, KV<Long, String>>> output = Join.leftOuterJoin(
- leftCollection, rightCollection, "");
-
- expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
- expectedResult.add(KV.of("Key2", KV.of(6L, "bar")));
- PAssert.that(output).containsInAnyOrder(expectedResult);
-
- p.run();
- }
-
- @Test
- public void testJoinOneToNoneMapping() {
- leftListOfKv.add(KV.of("Key2", 4L));
- PCollection<KV<String, Long>> leftCollection = p
- .apply("CreateLeft", Create.of(leftListOfKv));
-
- listRightOfKv.add(KV.of("Key3", "bar"));
- PCollection<KV<String, String>> rightCollection = p
- .apply("CreateRight", Create.of(listRightOfKv));
-
- PCollection<KV<String, KV<Long, String>>> output = Join.leftOuterJoin(
- leftCollection, rightCollection, "");
-
- expectedResult.add(KV.of("Key2", KV.of(4L, "")));
- PAssert.that(output).containsInAnyOrder(expectedResult);
- p.run();
- }
-
- @Test(expected = NullPointerException.class)
- public void testJoinLeftCollectionNull() {
- Join.leftOuterJoin(null, p.apply(Create.of(listRightOfKv)), "");
- }
-
- @Test(expected = NullPointerException.class)
- public void testJoinRightCollectionNull() {
- Join.leftOuterJoin(p.apply(Create.of(leftListOfKv)), null, "");
- }
-
- @Test(expected = NullPointerException.class)
- public void testJoinNullValueIsNull() {
- Join.leftOuterJoin(
- p.apply("CreateLeft", Create.of(leftListOfKv)),
- p.apply("CreateRight", Create.of(listRightOfKv)),
- null);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e2ca8890/contrib/join-library/src/test/java/org/apache/contrib/joinlibrary/OuterRightJoinTest.java
----------------------------------------------------------------------
diff --git a/contrib/join-library/src/test/java/org/apache/contrib/joinlibrary/OuterRightJoinTest.java b/contrib/join-library/src/test/java/org/apache/contrib/joinlibrary/OuterRightJoinTest.java
deleted file mode 100644
index 86028ac..0000000
--- a/contrib/join-library/src/test/java/org/apache/contrib/joinlibrary/OuterRightJoinTest.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.contrib.joinlibrary;
-
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.List;
-
-
-/**
- * This test Outer Right Join functionality.
- */
-public class OuterRightJoinTest {
-
- Pipeline p;
- List<KV<String, Long>> leftListOfKv;
- List<KV<String, String>> listRightOfKv;
- List<KV<String, KV<Long, String>>> expectedResult;
-
- @Before
- public void setup() {
-
- p = TestPipeline.create();
- leftListOfKv = new ArrayList<>();
- listRightOfKv = new ArrayList<>();
-
- expectedResult = new ArrayList<>();
- }
-
- @Test
- public void testJoinOneToOneMapping() {
- leftListOfKv.add(KV.of("Key1", 5L));
- leftListOfKv.add(KV.of("Key2", 4L));
- PCollection<KV<String, Long>> leftCollection = p
- .apply("CreateLeft", Create.of(leftListOfKv));
-
- listRightOfKv.add(KV.of("Key1", "foo"));
- listRightOfKv.add(KV.of("Key2", "bar"));
- PCollection<KV<String, String>> rightCollection = p
- .apply("CreateRight", Create.of(listRightOfKv));
-
- PCollection<KV<String, KV<Long, String>>> output = Join.rightOuterJoin(
- leftCollection, rightCollection, -1L);
-
- expectedResult.add(KV.of("Key1", KV.of(5L, "foo")));
- expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
- PAssert.that(output).containsInAnyOrder(expectedResult);
-
- p.run();
- }
-
- @Test
- public void testJoinOneToManyMapping() {
- leftListOfKv.add(KV.of("Key2", 4L));
- PCollection<KV<String, Long>> leftCollection = p
- .apply("CreateLeft", Create.of(leftListOfKv));
-
- listRightOfKv.add(KV.of("Key2", "bar"));
- listRightOfKv.add(KV.of("Key2", "gazonk"));
- PCollection<KV<String, String>> rightCollection = p
- .apply("CreateRight", Create.of(listRightOfKv));
-
- PCollection<KV<String, KV<Long, String>>> output = Join.rightOuterJoin(
- leftCollection, rightCollection, -1L);
-
- expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
- expectedResult.add(KV.of("Key2", KV.of(4L, "gazonk")));
- PAssert.that(output).containsInAnyOrder(expectedResult);
-
- p.run();
- }
-
- @Test
- public void testJoinManyToOneMapping() {
- leftListOfKv.add(KV.of("Key2", 4L));
- leftListOfKv.add(KV.of("Key2", 6L));
- PCollection<KV<String, Long>> leftCollection = p
- .apply("CreateLeft", Create.of(leftListOfKv));
-
- listRightOfKv.add(KV.of("Key2", "bar"));
- PCollection<KV<String, String>> rightCollection = p
- .apply("CreateRight", Create.of(listRightOfKv));
-
- PCollection<KV<String, KV<Long, String>>> output = Join.rightOuterJoin(
- leftCollection, rightCollection, -1L);
-
- expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
- expectedResult.add(KV.of("Key2", KV.of(6L, "bar")));
- PAssert.that(output).containsInAnyOrder(expectedResult);
-
- p.run();
- }
-
- @Test
- public void testJoinNoneToOneMapping() {
- leftListOfKv.add(KV.of("Key2", 4L));
- PCollection<KV<String, Long>> leftCollection = p
- .apply("CreateLeft", Create.of(leftListOfKv));
-
- listRightOfKv.add(KV.of("Key3", "bar"));
- PCollection<KV<String, String>> rightCollection = p
- .apply("CreateRight", Create.of(listRightOfKv));
-
- PCollection<KV<String, KV<Long, String>>> output = Join.rightOuterJoin(
- leftCollection, rightCollection, -1L);
-
- expectedResult.add(KV.of("Key3", KV.of(-1L, "bar")));
- PAssert.that(output).containsInAnyOrder(expectedResult);
- p.run();
- }
-
- @Test(expected = NullPointerException.class)
- public void testJoinLeftCollectionNull() {
- Join.rightOuterJoin(null, p.apply(Create.of(listRightOfKv)), "");
- }
-
- @Test(expected = NullPointerException.class)
- public void testJoinRightCollectionNull() {
- Join.rightOuterJoin(p.apply(Create.of(leftListOfKv)), null, -1L);
- }
-
- @Test(expected = NullPointerException.class)
- public void testJoinNullValueIsNull() {
- Join.rightOuterJoin(
- p.apply("CreateLeft", Create.of(leftListOfKv)),
- p.apply("CreateRight", Create.of(listRightOfKv)),
- null);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e2ca8890/sdks/java/extensions/join-library/README.md
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/join-library/README.md b/sdks/java/extensions/join-library/README.md
new file mode 100644
index 0000000..8e2a011
--- /dev/null
+++ b/sdks/java/extensions/join-library/README.md
@@ -0,0 +1,33 @@
+Join-library
+============
+
+Join-library provides inner join, outer left and right join functions to
+Google Cloud Dataflow. The aim is to simplify the most common cases of join to a
+simple function call.
+
+The functions are generic so it supports join of any types supported by
+Dataflow. Input to the join functions are PCollections of Key/Values. Both the
+left and right PCollections need the same type for the key. All the join
+functions return a Key/Value where Key is the join key and value is
+a Key/Value where the key is the left value and right is the value.
+
+In the cases of outer join, since null cannot be serialized the user have
+to provide a value that represent null for that particular use case.
+
+Example how to use join-library:
+
+ PCollection<KV<String, String>> leftPcollection = ...
+ PCollection<KV<String, Long>> rightPcollection = ...
+
+ PCollection<KV<String, KV<String, Long>>> joinedPcollection =
+ Join.innerJoin(leftPcollection, rightPcollection);
+
+Join-library can be found on maven-central:
+
+ <dependency>
+ <groupId>org.linuxalert.dataflow</groupId>
+ <artifactId>google-cloud-dataflow-java-contrib-joinlibrary</artifactId>
+ <version>0.0.3</version>
+ </dependency>
+
+Questions or comments: `M.Runesson [at] gmail [dot] com`
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e2ca8890/sdks/java/extensions/join-library/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/join-library/pom.xml b/sdks/java/extensions/join-library/pom.xml
new file mode 100644
index 0000000..2765276
--- /dev/null
+++ b/sdks/java/extensions/join-library/pom.xml
@@ -0,0 +1,111 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>extensions-parent</artifactId>
+ <version>0.1.0-incubating-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>join-library</artifactId>
+ <name>Apache Beam :: SDKs :: Java :: Extensions :: Join library</name>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.2</version>
+ <configuration>
+ <source>1.7</source>
+ <target>1.7</target>
+ </configuration>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-checkstyle-plugin</artifactId>
+ <version>2.17</version>
+ <configuration>
+ <configLocation>../../checkstyle.xml</configLocation>
+ <consoleOutput>true</consoleOutput>
+ <failOnViolation>true</failOnViolation>
+ <includeTestSourceDirectory>true</includeTestSourceDirectory>
+ </configuration>
+ <executions>
+ <execution>
+ <phase>validate</phase>
+ <goals>
+ <goal>check</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-source-plugin</artifactId>
+ <version>2.4</version>
+ <executions>
+ <execution>
+ <id>attach-sources</id>
+ <goals>
+ <goal>jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>java-sdk-all</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>19.0</version>
+ </dependency>
+
+ <!-- Dependency for tests -->
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-all</artifactId>
+ <version>1.3</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.12</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e2ca8890/sdks/java/extensions/join-library/src/main/java/org/apache/contrib/joinlibrary/Join.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/join-library/src/main/java/org/apache/contrib/joinlibrary/Join.java b/sdks/java/extensions/join-library/src/main/java/org/apache/contrib/joinlibrary/Join.java
new file mode 100644
index 0000000..6421e97
--- /dev/null
+++ b/sdks/java/extensions/join-library/src/main/java/org/apache/contrib/joinlibrary/Join.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.contrib.joinlibrary;
+
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.join.CoGbkResult;
+import org.apache.beam.sdk.transforms.join.CoGroupByKey;
+import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TupleTag;
+import com.google.common.base.Preconditions;
+
+/**
+ * Utility class with different versions of joins. All methods join two collections of
+ * key/value pairs (KV).
+ */
+public class Join {
+
+ /**
+ * Inner join of two collections of KV elements.
+ * @param leftCollection Left side collection to join.
+ * @param rightCollection Right side collection to join.
+ * @param <K> Type of the key for both collections
+ * @param <V1> Type of the values for the left collection.
+ * @param <V2> Type of the values for the right collection.
+ * @return A joined collection of KV where Key is the key and value is a
+ * KV where Key is of type V1 and Value is type V2.
+ */
+ public static <K, V1, V2> PCollection<KV<K, KV<V1, V2>>> innerJoin(
+ final PCollection<KV<K, V1>> leftCollection, final PCollection<KV<K, V2>> rightCollection) {
+ Preconditions.checkNotNull(leftCollection);
+ Preconditions.checkNotNull(rightCollection);
+
+ final TupleTag<V1> v1Tuple = new TupleTag<>();
+ final TupleTag<V2> v2Tuple = new TupleTag<>();
+
+ PCollection<KV<K, CoGbkResult>> coGbkResultCollection =
+ KeyedPCollectionTuple.of(v1Tuple, leftCollection)
+ .and(v2Tuple, rightCollection)
+ .apply(CoGroupByKey.<K>create());
+
+ return coGbkResultCollection.apply(ParDo.of(
+ new DoFn<KV<K, CoGbkResult>, KV<K, KV<V1, V2>>>() {
+ @Override
+ public void processElement(ProcessContext c) {
+ KV<K, CoGbkResult> e = c.element();
+
+ Iterable<V1> leftValuesIterable = e.getValue().getAll(v1Tuple);
+ Iterable<V2> rightValuesIterable = e.getValue().getAll(v2Tuple);
+
+ for (V1 leftValue : leftValuesIterable) {
+ for (V2 rightValue : rightValuesIterable) {
+ c.output(KV.of(e.getKey(), KV.of(leftValue, rightValue)));
+ }
+ }
+ }
+ }))
+ .setCoder(KvCoder.of(((KvCoder) leftCollection.getCoder()).getKeyCoder(),
+ KvCoder.of(((KvCoder) leftCollection.getCoder()).getValueCoder(),
+ ((KvCoder) rightCollection.getCoder()).getValueCoder())));
+ }
+
+ /**
+ * Left Outer Join of two collections of KV elements.
+ * @param leftCollection Left side collection to join.
+ * @param rightCollection Right side collection to join.
+ * @param nullValue Value to use as null value when right side do not match left side.
+ * @param <K> Type of the key for both collections
+ * @param <V1> Type of the values for the left collection.
+ * @param <V2> Type of the values for the right collection.
+ * @return A joined collection of KV where Key is the key and value is a
+ * KV where Key is of type V1 and Value is type V2. Values that
+ * should be null or empty is replaced with nullValue.
+ */
+ public static <K, V1, V2> PCollection<KV<K, KV<V1, V2>>> leftOuterJoin(
+ final PCollection<KV<K, V1>> leftCollection,
+ final PCollection<KV<K, V2>> rightCollection,
+ final V2 nullValue) {
+ Preconditions.checkNotNull(leftCollection);
+ Preconditions.checkNotNull(rightCollection);
+ Preconditions.checkNotNull(nullValue);
+
+ final TupleTag<V1> v1Tuple = new TupleTag<>();
+ final TupleTag<V2> v2Tuple = new TupleTag<>();
+
+ PCollection<KV<K, CoGbkResult>> coGbkResultCollection =
+ KeyedPCollectionTuple.of(v1Tuple, leftCollection)
+ .and(v2Tuple, rightCollection)
+ .apply(CoGroupByKey.<K>create());
+
+ return coGbkResultCollection.apply(ParDo.of(
+ new DoFn<KV<K, CoGbkResult>, KV<K, KV<V1, V2>>>() {
+ @Override
+ public void processElement(ProcessContext c) {
+ KV<K, CoGbkResult> e = c.element();
+
+ Iterable<V1> leftValuesIterable = e.getValue().getAll(v1Tuple);
+ Iterable<V2> rightValuesIterable = e.getValue().getAll(v2Tuple);
+
+ for (V1 leftValue : leftValuesIterable) {
+ if (rightValuesIterable.iterator().hasNext()) {
+ for (V2 rightValue : rightValuesIterable) {
+ c.output(KV.of(e.getKey(), KV.of(leftValue, rightValue)));
+ }
+ } else {
+ c.output(KV.of(e.getKey(), KV.of(leftValue, nullValue)));
+ }
+ }
+ }
+ }))
+ .setCoder(KvCoder.of(((KvCoder) leftCollection.getCoder()).getKeyCoder(),
+ KvCoder.of(((KvCoder) leftCollection.getCoder()).getValueCoder(),
+ ((KvCoder) rightCollection.getCoder()).getValueCoder())));
+ }
+
+ /**
+ * Right Outer Join of two collections of KV elements.
+ * @param leftCollection Left side collection to join.
+ * @param rightCollection Right side collection to join.
+ * @param nullValue Value to use as null value when left side do not match right side.
+ * @param <K> Type of the key for both collections
+ * @param <V1> Type of the values for the left collection.
+ * @param <V2> Type of the values for the right collection.
+ * @return A joined collection of KV where Key is the key and value is a
+ * KV where Key is of type V1 and Value is type V2. Keys that
+ * should be null or empty is replaced with nullValue.
+ */
+ public static <K, V1, V2> PCollection<KV<K, KV<V1, V2>>> rightOuterJoin(
+ final PCollection<KV<K, V1>> leftCollection,
+ final PCollection<KV<K, V2>> rightCollection,
+ final V1 nullValue) {
+ Preconditions.checkNotNull(leftCollection);
+ Preconditions.checkNotNull(rightCollection);
+ Preconditions.checkNotNull(nullValue);
+
+ final TupleTag<V1> v1Tuple = new TupleTag<>();
+ final TupleTag<V2> v2Tuple = new TupleTag<>();
+
+ PCollection<KV<K, CoGbkResult>> coGbkResultCollection =
+ KeyedPCollectionTuple.of(v1Tuple, leftCollection)
+ .and(v2Tuple, rightCollection)
+ .apply(CoGroupByKey.<K>create());
+
+ return coGbkResultCollection.apply(ParDo.of(
+ new DoFn<KV<K, CoGbkResult>, KV<K, KV<V1, V2>>>() {
+ @Override
+ public void processElement(ProcessContext c) {
+ KV<K, CoGbkResult> e = c.element();
+
+ Iterable<V1> leftValuesIterable = e.getValue().getAll(v1Tuple);
+ Iterable<V2> rightValuesIterable = e.getValue().getAll(v2Tuple);
+
+ for (V2 rightValue : rightValuesIterable) {
+ if (leftValuesIterable.iterator().hasNext()) {
+ for (V1 leftValue : leftValuesIterable) {
+ c.output(KV.of(e.getKey(), KV.of(leftValue, rightValue)));
+ }
+ } else {
+ c.output(KV.of(e.getKey(), KV.of(nullValue, rightValue)));
+ }
+ }
+ }
+ }))
+ .setCoder(KvCoder.of(((KvCoder) leftCollection.getCoder()).getKeyCoder(),
+ KvCoder.of(((KvCoder) leftCollection.getCoder()).getValueCoder(),
+ ((KvCoder) rightCollection.getCoder()).getValueCoder())));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e2ca8890/sdks/java/extensions/join-library/src/test/java/org/apache/contrib/joinlibrary/InnerJoinTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/join-library/src/test/java/org/apache/contrib/joinlibrary/InnerJoinTest.java b/sdks/java/extensions/join-library/src/test/java/org/apache/contrib/joinlibrary/InnerJoinTest.java
new file mode 100644
index 0000000..99e9c4b
--- /dev/null
+++ b/sdks/java/extensions/join-library/src/test/java/org/apache/contrib/joinlibrary/InnerJoinTest.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.contrib.joinlibrary;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * This test Inner Join functionality.
+ */
+public class InnerJoinTest {
+
+ Pipeline p;
+ List<KV<String, Long>> leftListOfKv;
+ List<KV<String, String>> listRightOfKv;
+ List<KV<String, KV<Long, String>>> expectedResult;
+
+ @Before
+ public void setup() {
+
+ p = TestPipeline.create();
+ leftListOfKv = new ArrayList<>();
+ listRightOfKv = new ArrayList<>();
+
+ expectedResult = new ArrayList<>();
+ }
+
+ @Test
+ public void testJoinOneToOneMapping() {
+ leftListOfKv.add(KV.of("Key1", 5L));
+ leftListOfKv.add(KV.of("Key2", 4L));
+ PCollection<KV<String, Long>> leftCollection =
+ p.apply("CreateLeft", Create.of(leftListOfKv));
+
+ listRightOfKv.add(KV.of("Key1", "foo"));
+ listRightOfKv.add(KV.of("Key2", "bar"));
+ PCollection<KV<String, String>> rightCollection =
+ p.apply("CreateRight", Create.of(listRightOfKv));
+
+ PCollection<KV<String, KV<Long, String>>> output = Join.innerJoin(
+ leftCollection, rightCollection);
+
+ expectedResult.add(KV.of("Key1", KV.of(5L, "foo")));
+ expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
+ PAssert.that(output).containsInAnyOrder(expectedResult);
+
+ p.run();
+ }
+
+ @Test
+ public void testJoinOneToManyMapping() {
+ leftListOfKv.add(KV.of("Key2", 4L));
+ PCollection<KV<String, Long>> leftCollection = p
+ .apply("CreateLeft", Create.of(leftListOfKv));
+
+ listRightOfKv.add(KV.of("Key2", "bar"));
+ listRightOfKv.add(KV.of("Key2", "gazonk"));
+ PCollection<KV<String, String>> rightCollection = p
+ .apply("CreateRight", Create.of(listRightOfKv));
+
+ PCollection<KV<String, KV<Long, String>>> output = Join.innerJoin(
+ leftCollection, rightCollection);
+
+ expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
+ expectedResult.add(KV.of("Key2", KV.of(4L, "gazonk")));
+ PAssert.that(output).containsInAnyOrder(expectedResult);
+
+ p.run();
+ }
+
+ @Test
+ public void testJoinManyToOneMapping() {
+ leftListOfKv.add(KV.of("Key2", 4L));
+ leftListOfKv.add(KV.of("Key2", 6L));
+ PCollection<KV<String, Long>> leftCollection = p
+ .apply("CreateLeft", Create.of(leftListOfKv));
+
+ listRightOfKv.add(KV.of("Key2", "bar"));
+ PCollection<KV<String, String>> rightCollection = p
+ .apply("CreateRight", Create.of(listRightOfKv));
+
+ PCollection<KV<String, KV<Long, String>>> output = Join.innerJoin(
+ leftCollection, rightCollection);
+
+ expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
+ expectedResult.add(KV.of("Key2", KV.of(6L, "bar")));
+ PAssert.that(output).containsInAnyOrder(expectedResult);
+
+ p.run();
+ }
+
+ @Test
+ public void testJoinNoneToNoneMapping() {
+ leftListOfKv.add(KV.of("Key2", 4L));
+ PCollection<KV<String, Long>> leftCollection = p
+ .apply("CreateLeft", Create.of(leftListOfKv));
+
+ listRightOfKv.add(KV.of("Key3", "bar"));
+ PCollection<KV<String, String>> rightCollection = p
+ .apply("CreateRight", Create.of(listRightOfKv));
+
+ PCollection<KV<String, KV<Long, String>>> output = Join.innerJoin(
+ leftCollection, rightCollection);
+
+ PAssert.that(output).containsInAnyOrder(expectedResult);
+ p.run();
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void testJoinLeftCollectionNull() {
+ Join.innerJoin(null, p.apply(Create.of(listRightOfKv)));
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void testJoinRightCollectionNull() {
+ Join.innerJoin(p.apply(Create.of(leftListOfKv)), null);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e2ca8890/sdks/java/extensions/join-library/src/test/java/org/apache/contrib/joinlibrary/OuterLeftJoinTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/join-library/src/test/java/org/apache/contrib/joinlibrary/OuterLeftJoinTest.java b/sdks/java/extensions/join-library/src/test/java/org/apache/contrib/joinlibrary/OuterLeftJoinTest.java
new file mode 100644
index 0000000..ca09136
--- /dev/null
+++ b/sdks/java/extensions/join-library/src/test/java/org/apache/contrib/joinlibrary/OuterLeftJoinTest.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.contrib.joinlibrary;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+
+/**
+ * This test Outer Left Join functionality.
+ */
+public class OuterLeftJoinTest {
+
+ Pipeline p;
+ List<KV<String, Long>> leftListOfKv;
+ List<KV<String, String>> listRightOfKv;
+ List<KV<String, KV<Long, String>>> expectedResult;
+
+ @Before
+ public void setup() {
+
+ p = TestPipeline.create();
+ leftListOfKv = new ArrayList<>();
+ listRightOfKv = new ArrayList<>();
+
+ expectedResult = new ArrayList<>();
+ }
+
+ @Test
+ public void testJoinOneToOneMapping() {
+ leftListOfKv.add(KV.of("Key1", 5L));
+ leftListOfKv.add(KV.of("Key2", 4L));
+ PCollection<KV<String, Long>> leftCollection = p
+ .apply("CreateLeft", Create.of(leftListOfKv));
+
+ listRightOfKv.add(KV.of("Key1", "foo"));
+ listRightOfKv.add(KV.of("Key2", "bar"));
+ PCollection<KV<String, String>> rightCollection = p
+ .apply("CreateRight", Create.of(listRightOfKv));
+
+ PCollection<KV<String, KV<Long, String>>> output = Join.leftOuterJoin(
+ leftCollection, rightCollection, "");
+
+ expectedResult.add(KV.of("Key1", KV.of(5L, "foo")));
+ expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
+ PAssert.that(output).containsInAnyOrder(expectedResult);
+
+ p.run();
+ }
+
+ @Test
+ public void testJoinOneToManyMapping() {
+ leftListOfKv.add(KV.of("Key2", 4L));
+ PCollection<KV<String, Long>> leftCollection = p
+ .apply("CreateLeft", Create.of(leftListOfKv));
+
+ listRightOfKv.add(KV.of("Key2", "bar"));
+ listRightOfKv.add(KV.of("Key2", "gazonk"));
+ PCollection<KV<String, String>> rightCollection = p
+ .apply("CreateRight", Create.of(listRightOfKv));
+
+ PCollection<KV<String, KV<Long, String>>> output = Join.leftOuterJoin(
+ leftCollection, rightCollection, "");
+
+ expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
+ expectedResult.add(KV.of("Key2", KV.of(4L, "gazonk")));
+ PAssert.that(output).containsInAnyOrder(expectedResult);
+
+ p.run();
+ }
+
+ @Test
+ public void testJoinManyToOneMapping() {
+ leftListOfKv.add(KV.of("Key2", 4L));
+ leftListOfKv.add(KV.of("Key2", 6L));
+ PCollection<KV<String, Long>> leftCollection = p
+ .apply("CreateLeft", Create.of(leftListOfKv));
+
+ listRightOfKv.add(KV.of("Key2", "bar"));
+ PCollection<KV<String, String>> rightCollection = p
+ .apply("CreateRight", Create.of(listRightOfKv));
+
+ PCollection<KV<String, KV<Long, String>>> output = Join.leftOuterJoin(
+ leftCollection, rightCollection, "");
+
+ expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
+ expectedResult.add(KV.of("Key2", KV.of(6L, "bar")));
+ PAssert.that(output).containsInAnyOrder(expectedResult);
+
+ p.run();
+ }
+
+ @Test
+ public void testJoinOneToNoneMapping() {
+ leftListOfKv.add(KV.of("Key2", 4L));
+ PCollection<KV<String, Long>> leftCollection = p
+ .apply("CreateLeft", Create.of(leftListOfKv));
+
+ listRightOfKv.add(KV.of("Key3", "bar"));
+ PCollection<KV<String, String>> rightCollection = p
+ .apply("CreateRight", Create.of(listRightOfKv));
+
+ PCollection<KV<String, KV<Long, String>>> output = Join.leftOuterJoin(
+ leftCollection, rightCollection, "");
+
+ expectedResult.add(KV.of("Key2", KV.of(4L, "")));
+ PAssert.that(output).containsInAnyOrder(expectedResult);
+ p.run();
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void testJoinLeftCollectionNull() {
+ Join.leftOuterJoin(null, p.apply(Create.of(listRightOfKv)), "");
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void testJoinRightCollectionNull() {
+ Join.leftOuterJoin(p.apply(Create.of(leftListOfKv)), null, "");
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void testJoinNullValueIsNull() {
+ Join.leftOuterJoin(
+ p.apply("CreateLeft", Create.of(leftListOfKv)),
+ p.apply("CreateRight", Create.of(listRightOfKv)),
+ null);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e2ca8890/sdks/java/extensions/join-library/src/test/java/org/apache/contrib/joinlibrary/OuterRightJoinTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/join-library/src/test/java/org/apache/contrib/joinlibrary/OuterRightJoinTest.java b/sdks/java/extensions/join-library/src/test/java/org/apache/contrib/joinlibrary/OuterRightJoinTest.java
new file mode 100644
index 0000000..86028ac
--- /dev/null
+++ b/sdks/java/extensions/join-library/src/test/java/org/apache/contrib/joinlibrary/OuterRightJoinTest.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.contrib.joinlibrary;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+
+/**
+ * This test Outer Right Join functionality.
+ */
+public class OuterRightJoinTest {
+
+ Pipeline p;
+ List<KV<String, Long>> leftListOfKv;
+ List<KV<String, String>> listRightOfKv;
+ List<KV<String, KV<Long, String>>> expectedResult;
+
+ @Before
+ public void setup() {
+
+ p = TestPipeline.create();
+ leftListOfKv = new ArrayList<>();
+ listRightOfKv = new ArrayList<>();
+
+ expectedResult = new ArrayList<>();
+ }
+
+ @Test
+ public void testJoinOneToOneMapping() {
+ leftListOfKv.add(KV.of("Key1", 5L));
+ leftListOfKv.add(KV.of("Key2", 4L));
+ PCollection<KV<String, Long>> leftCollection = p
+ .apply("CreateLeft", Create.of(leftListOfKv));
+
+ listRightOfKv.add(KV.of("Key1", "foo"));
+ listRightOfKv.add(KV.of("Key2", "bar"));
+ PCollection<KV<String, String>> rightCollection = p
+ .apply("CreateRight", Create.of(listRightOfKv));
+
+ PCollection<KV<String, KV<Long, String>>> output = Join.rightOuterJoin(
+ leftCollection, rightCollection, -1L);
+
+ expectedResult.add(KV.of("Key1", KV.of(5L, "foo")));
+ expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
+ PAssert.that(output).containsInAnyOrder(expectedResult);
+
+ p.run();
+ }
+
+ @Test
+ public void testJoinOneToManyMapping() {
+ leftListOfKv.add(KV.of("Key2", 4L));
+ PCollection<KV<String, Long>> leftCollection = p
+ .apply("CreateLeft", Create.of(leftListOfKv));
+
+ listRightOfKv.add(KV.of("Key2", "bar"));
+ listRightOfKv.add(KV.of("Key2", "gazonk"));
+ PCollection<KV<String, String>> rightCollection = p
+ .apply("CreateRight", Create.of(listRightOfKv));
+
+ PCollection<KV<String, KV<Long, String>>> output = Join.rightOuterJoin(
+ leftCollection, rightCollection, -1L);
+
+ expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
+ expectedResult.add(KV.of("Key2", KV.of(4L, "gazonk")));
+ PAssert.that(output).containsInAnyOrder(expectedResult);
+
+ p.run();
+ }
+
+ @Test
+ public void testJoinManyToOneMapping() {
+ leftListOfKv.add(KV.of("Key2", 4L));
+ leftListOfKv.add(KV.of("Key2", 6L));
+ PCollection<KV<String, Long>> leftCollection = p
+ .apply("CreateLeft", Create.of(leftListOfKv));
+
+ listRightOfKv.add(KV.of("Key2", "bar"));
+ PCollection<KV<String, String>> rightCollection = p
+ .apply("CreateRight", Create.of(listRightOfKv));
+
+ PCollection<KV<String, KV<Long, String>>> output = Join.rightOuterJoin(
+ leftCollection, rightCollection, -1L);
+
+ expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
+ expectedResult.add(KV.of("Key2", KV.of(6L, "bar")));
+ PAssert.that(output).containsInAnyOrder(expectedResult);
+
+ p.run();
+ }
+
+ @Test
+ public void testJoinNoneToOneMapping() {
+ leftListOfKv.add(KV.of("Key2", 4L));
+ PCollection<KV<String, Long>> leftCollection = p
+ .apply("CreateLeft", Create.of(leftListOfKv));
+
+ listRightOfKv.add(KV.of("Key3", "bar"));
+ PCollection<KV<String, String>> rightCollection = p
+ .apply("CreateRight", Create.of(listRightOfKv));
+
+ PCollection<KV<String, KV<Long, String>>> output = Join.rightOuterJoin(
+ leftCollection, rightCollection, -1L);
+
+ expectedResult.add(KV.of("Key3", KV.of(-1L, "bar")));
+ PAssert.that(output).containsInAnyOrder(expectedResult);
+ p.run();
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void testJoinLeftCollectionNull() {
+ Join.rightOuterJoin(null, p.apply(Create.of(listRightOfKv)), "");
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void testJoinRightCollectionNull() {
+ Join.rightOuterJoin(p.apply(Create.of(leftListOfKv)), null, -1L);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void testJoinNullValueIsNull() {
+ Join.rightOuterJoin(
+ p.apply("CreateLeft", Create.of(leftListOfKv)),
+ p.apply("CreateRight", Create.of(listRightOfKv)),
+ null);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e2ca8890/sdks/java/extensions/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/pom.xml b/sdks/java/extensions/pom.xml
new file mode 100644
index 0000000..180b8b7
--- /dev/null
+++ b/sdks/java/extensions/pom.xml
@@ -0,0 +1,40 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>java-sdk-parent</artifactId>
+ <version>0.1.0-incubating-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>extensions-parent</artifactId>
+ <packaging>pom</packaging>
+
+ <name>Apache Beam :: SDKs :: Java :: Extensions</name>
+
+ <modules>
+ <module>join-library</module>
+ </modules>
+
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e2ca8890/sdks/java/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/pom.xml b/sdks/java/pom.xml
index 6bd7ee7..e5da2d5 100644
--- a/sdks/java/pom.xml
+++ b/sdks/java/pom.xml
@@ -41,6 +41,7 @@
DataflowPipelineRunner. Until these are refactored out or
a released artifact exists, we need to modify the build order.
<module>maven-archetypes</module> -->
+ <module>extensions</module>
</modules>
<profiles>