You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bc...@apache.org on 2016/04/26 02:50:22 UTC

[1/3] incubator-beam git commit: Reorganize Java packages

Repository: incubator-beam
Updated Branches:
  refs/heads/master d5814a38d -> 0f7b81615


Reorganize Java packages

Move org.apache.contrib.joinlibrary to org.apache.beam.sdk.extensions.joinlibrary.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e4059412
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e4059412
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e4059412

Branch: refs/heads/master
Commit: e4059412d111a223ee7cfda0004a62699b38f41f
Parents: e2ca889
Author: Davor Bonaci <da...@google.com>
Authored: Mon Apr 25 16:45:59 2016 -0700
Committer: bchambers <bc...@google.com>
Committed: Mon Apr 25 17:32:23 2016 -0700

----------------------------------------------------------------------
 .../beam/sdk/extensions/joinlibrary/Join.java   | 186 +++++++++++++++++++
 .../org/apache/contrib/joinlibrary/Join.java    | 186 -------------------
 .../extensions/joinlibrary/InnerJoinTest.java   | 143 ++++++++++++++
 .../joinlibrary/OuterLeftJoinTest.java          | 153 +++++++++++++++
 .../joinlibrary/OuterRightJoinTest.java         | 153 +++++++++++++++
 .../contrib/joinlibrary/InnerJoinTest.java      | 143 --------------
 .../contrib/joinlibrary/OuterLeftJoinTest.java  | 153 ---------------
 .../contrib/joinlibrary/OuterRightJoinTest.java | 153 ---------------
 8 files changed, 635 insertions(+), 635 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4059412/sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java b/sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java
new file mode 100644
index 0000000..968a613
--- /dev/null
+++ b/sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.joinlibrary;
+
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.join.CoGbkResult;
+import org.apache.beam.sdk.transforms.join.CoGroupByKey;
+import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TupleTag;
+import com.google.common.base.Preconditions;
+
+/**
+ * Utility class with different versions of joins. All methods join two collections of
+ * key/value pairs (KV).
+ */
+public class Join {
+
+  /**
+   * Inner join of two collections of KV elements.
+   * @param leftCollection Left side collection to join.
+   * @param rightCollection Right side collection to join.
+   * @param <K> Type of the key for both collections
+   * @param <V1> Type of the values for the left collection.
+   * @param <V2> Type of the values for the right collection.
+   * @return A joined collection of KV where Key is the key and value is a
+   *         KV where Key is of type V1 and Value is type V2.
+   */
+  public static <K, V1, V2> PCollection<KV<K, KV<V1, V2>>> innerJoin(
+    final PCollection<KV<K, V1>> leftCollection, final PCollection<KV<K, V2>> rightCollection) {
+    Preconditions.checkNotNull(leftCollection);
+    Preconditions.checkNotNull(rightCollection);
+
+    final TupleTag<V1> v1Tuple = new TupleTag<>();
+    final TupleTag<V2> v2Tuple = new TupleTag<>();
+
+    PCollection<KV<K, CoGbkResult>> coGbkResultCollection =
+      KeyedPCollectionTuple.of(v1Tuple, leftCollection)
+        .and(v2Tuple, rightCollection)
+        .apply(CoGroupByKey.<K>create());
+
+    return coGbkResultCollection.apply(ParDo.of(
+      new DoFn<KV<K, CoGbkResult>, KV<K, KV<V1, V2>>>() {
+        @Override
+        public void processElement(ProcessContext c) {
+          KV<K, CoGbkResult> e = c.element();
+
+          Iterable<V1> leftValuesIterable = e.getValue().getAll(v1Tuple);
+          Iterable<V2> rightValuesIterable = e.getValue().getAll(v2Tuple);
+
+          for (V1 leftValue : leftValuesIterable) {
+            for (V2 rightValue : rightValuesIterable) {
+              c.output(KV.of(e.getKey(), KV.of(leftValue, rightValue)));
+            }
+          }
+        }
+      }))
+      .setCoder(KvCoder.of(((KvCoder) leftCollection.getCoder()).getKeyCoder(),
+                           KvCoder.of(((KvCoder) leftCollection.getCoder()).getValueCoder(),
+                                      ((KvCoder) rightCollection.getCoder()).getValueCoder())));
+  }
+
+  /**
+   * Left Outer Join of two collections of KV elements.
+   * @param leftCollection Left side collection to join.
+   * @param rightCollection Right side collection to join.
+   * @param nullValue Value to use as null value when right side do not match left side.
+   * @param <K> Type of the key for both collections
+   * @param <V1> Type of the values for the left collection.
+   * @param <V2> Type of the values for the right collection.
+   * @return A joined collection of KV where Key is the key and value is a
+   *         KV where Key is of type V1 and Value is type V2. Values that
+   *         should be null or empty is replaced with nullValue.
+   */
+  public static <K, V1, V2> PCollection<KV<K, KV<V1, V2>>> leftOuterJoin(
+    final PCollection<KV<K, V1>> leftCollection,
+    final PCollection<KV<K, V2>> rightCollection,
+    final V2 nullValue) {
+    Preconditions.checkNotNull(leftCollection);
+    Preconditions.checkNotNull(rightCollection);
+    Preconditions.checkNotNull(nullValue);
+
+    final TupleTag<V1> v1Tuple = new TupleTag<>();
+    final TupleTag<V2> v2Tuple = new TupleTag<>();
+
+    PCollection<KV<K, CoGbkResult>> coGbkResultCollection =
+      KeyedPCollectionTuple.of(v1Tuple, leftCollection)
+        .and(v2Tuple, rightCollection)
+        .apply(CoGroupByKey.<K>create());
+
+    return coGbkResultCollection.apply(ParDo.of(
+      new DoFn<KV<K, CoGbkResult>, KV<K, KV<V1, V2>>>() {
+        @Override
+        public void processElement(ProcessContext c) {
+          KV<K, CoGbkResult> e = c.element();
+
+          Iterable<V1> leftValuesIterable = e.getValue().getAll(v1Tuple);
+          Iterable<V2> rightValuesIterable = e.getValue().getAll(v2Tuple);
+
+          for (V1 leftValue : leftValuesIterable) {
+            if (rightValuesIterable.iterator().hasNext()) {
+              for (V2 rightValue : rightValuesIterable) {
+                c.output(KV.of(e.getKey(), KV.of(leftValue, rightValue)));
+              }
+            } else {
+              c.output(KV.of(e.getKey(), KV.of(leftValue, nullValue)));
+            }
+          }
+        }
+      }))
+      .setCoder(KvCoder.of(((KvCoder) leftCollection.getCoder()).getKeyCoder(),
+                           KvCoder.of(((KvCoder) leftCollection.getCoder()).getValueCoder(),
+                                      ((KvCoder) rightCollection.getCoder()).getValueCoder())));
+  }
+
+  /**
+   * Right Outer Join of two collections of KV elements.
+   * @param leftCollection Left side collection to join.
+   * @param rightCollection Right side collection to join.
+   * @param nullValue Value to use as null value when left side do not match right side.
+   * @param <K> Type of the key for both collections
+   * @param <V1> Type of the values for the left collection.
+   * @param <V2> Type of the values for the right collection.
+   * @return A joined collection of KV where Key is the key and value is a
+   *         KV where Key is of type V1 and Value is type V2. Keys that
+   *         should be null or empty is replaced with nullValue.
+   */
+  public static <K, V1, V2> PCollection<KV<K, KV<V1, V2>>> rightOuterJoin(
+    final PCollection<KV<K, V1>> leftCollection,
+    final PCollection<KV<K, V2>> rightCollection,
+    final V1 nullValue) {
+    Preconditions.checkNotNull(leftCollection);
+    Preconditions.checkNotNull(rightCollection);
+    Preconditions.checkNotNull(nullValue);
+
+    final TupleTag<V1> v1Tuple = new TupleTag<>();
+    final TupleTag<V2> v2Tuple = new TupleTag<>();
+
+    PCollection<KV<K, CoGbkResult>> coGbkResultCollection =
+      KeyedPCollectionTuple.of(v1Tuple, leftCollection)
+        .and(v2Tuple, rightCollection)
+        .apply(CoGroupByKey.<K>create());
+
+    return coGbkResultCollection.apply(ParDo.of(
+      new DoFn<KV<K, CoGbkResult>, KV<K, KV<V1, V2>>>() {
+        @Override
+        public void processElement(ProcessContext c) {
+          KV<K, CoGbkResult> e = c.element();
+
+          Iterable<V1> leftValuesIterable = e.getValue().getAll(v1Tuple);
+          Iterable<V2> rightValuesIterable = e.getValue().getAll(v2Tuple);
+
+          for (V2 rightValue : rightValuesIterable) {
+            if (leftValuesIterable.iterator().hasNext()) {
+              for (V1 leftValue : leftValuesIterable) {
+                c.output(KV.of(e.getKey(), KV.of(leftValue, rightValue)));
+              }
+            } else {
+              c.output(KV.of(e.getKey(), KV.of(nullValue, rightValue)));
+            }
+          }
+        }
+      }))
+      .setCoder(KvCoder.of(((KvCoder) leftCollection.getCoder()).getKeyCoder(),
+                           KvCoder.of(((KvCoder) leftCollection.getCoder()).getValueCoder(),
+                                      ((KvCoder) rightCollection.getCoder()).getValueCoder())));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4059412/sdks/java/extensions/join-library/src/main/java/org/apache/contrib/joinlibrary/Join.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/join-library/src/main/java/org/apache/contrib/joinlibrary/Join.java b/sdks/java/extensions/join-library/src/main/java/org/apache/contrib/joinlibrary/Join.java
deleted file mode 100644
index 6421e97..0000000
--- a/sdks/java/extensions/join-library/src/main/java/org/apache/contrib/joinlibrary/Join.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.contrib.joinlibrary;
-
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.join.CoGbkResult;
-import org.apache.beam.sdk.transforms.join.CoGroupByKey;
-import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.TupleTag;
-import com.google.common.base.Preconditions;
-
-/**
- * Utility class with different versions of joins. All methods join two collections of
- * key/value pairs (KV).
- */
-public class Join {
-
-  /**
-   * Inner join of two collections of KV elements.
-   * @param leftCollection Left side collection to join.
-   * @param rightCollection Right side collection to join.
-   * @param <K> Type of the key for both collections
-   * @param <V1> Type of the values for the left collection.
-   * @param <V2> Type of the values for the right collection.
-   * @return A joined collection of KV where Key is the key and value is a
-   *         KV where Key is of type V1 and Value is type V2.
-   */
-  public static <K, V1, V2> PCollection<KV<K, KV<V1, V2>>> innerJoin(
-    final PCollection<KV<K, V1>> leftCollection, final PCollection<KV<K, V2>> rightCollection) {
-    Preconditions.checkNotNull(leftCollection);
-    Preconditions.checkNotNull(rightCollection);
-
-    final TupleTag<V1> v1Tuple = new TupleTag<>();
-    final TupleTag<V2> v2Tuple = new TupleTag<>();
-
-    PCollection<KV<K, CoGbkResult>> coGbkResultCollection =
-      KeyedPCollectionTuple.of(v1Tuple, leftCollection)
-        .and(v2Tuple, rightCollection)
-        .apply(CoGroupByKey.<K>create());
-
-    return coGbkResultCollection.apply(ParDo.of(
-      new DoFn<KV<K, CoGbkResult>, KV<K, KV<V1, V2>>>() {
-        @Override
-        public void processElement(ProcessContext c) {
-          KV<K, CoGbkResult> e = c.element();
-
-          Iterable<V1> leftValuesIterable = e.getValue().getAll(v1Tuple);
-          Iterable<V2> rightValuesIterable = e.getValue().getAll(v2Tuple);
-
-          for (V1 leftValue : leftValuesIterable) {
-            for (V2 rightValue : rightValuesIterable) {
-              c.output(KV.of(e.getKey(), KV.of(leftValue, rightValue)));
-            }
-          }
-        }
-      }))
-      .setCoder(KvCoder.of(((KvCoder) leftCollection.getCoder()).getKeyCoder(),
-                           KvCoder.of(((KvCoder) leftCollection.getCoder()).getValueCoder(),
-                                      ((KvCoder) rightCollection.getCoder()).getValueCoder())));
-  }
-
-  /**
-   * Left Outer Join of two collections of KV elements.
-   * @param leftCollection Left side collection to join.
-   * @param rightCollection Right side collection to join.
-   * @param nullValue Value to use as null value when right side do not match left side.
-   * @param <K> Type of the key for both collections
-   * @param <V1> Type of the values for the left collection.
-   * @param <V2> Type of the values for the right collection.
-   * @return A joined collection of KV where Key is the key and value is a
-   *         KV where Key is of type V1 and Value is type V2. Values that
-   *         should be null or empty is replaced with nullValue.
-   */
-  public static <K, V1, V2> PCollection<KV<K, KV<V1, V2>>> leftOuterJoin(
-    final PCollection<KV<K, V1>> leftCollection,
-    final PCollection<KV<K, V2>> rightCollection,
-    final V2 nullValue) {
-    Preconditions.checkNotNull(leftCollection);
-    Preconditions.checkNotNull(rightCollection);
-    Preconditions.checkNotNull(nullValue);
-
-    final TupleTag<V1> v1Tuple = new TupleTag<>();
-    final TupleTag<V2> v2Tuple = new TupleTag<>();
-
-    PCollection<KV<K, CoGbkResult>> coGbkResultCollection =
-      KeyedPCollectionTuple.of(v1Tuple, leftCollection)
-        .and(v2Tuple, rightCollection)
-        .apply(CoGroupByKey.<K>create());
-
-    return coGbkResultCollection.apply(ParDo.of(
-      new DoFn<KV<K, CoGbkResult>, KV<K, KV<V1, V2>>>() {
-        @Override
-        public void processElement(ProcessContext c) {
-          KV<K, CoGbkResult> e = c.element();
-
-          Iterable<V1> leftValuesIterable = e.getValue().getAll(v1Tuple);
-          Iterable<V2> rightValuesIterable = e.getValue().getAll(v2Tuple);
-
-          for (V1 leftValue : leftValuesIterable) {
-            if (rightValuesIterable.iterator().hasNext()) {
-              for (V2 rightValue : rightValuesIterable) {
-                c.output(KV.of(e.getKey(), KV.of(leftValue, rightValue)));
-              }
-            } else {
-              c.output(KV.of(e.getKey(), KV.of(leftValue, nullValue)));
-            }
-          }
-        }
-      }))
-      .setCoder(KvCoder.of(((KvCoder) leftCollection.getCoder()).getKeyCoder(),
-                           KvCoder.of(((KvCoder) leftCollection.getCoder()).getValueCoder(),
-                                      ((KvCoder) rightCollection.getCoder()).getValueCoder())));
-  }
-
-  /**
-   * Right Outer Join of two collections of KV elements.
-   * @param leftCollection Left side collection to join.
-   * @param rightCollection Right side collection to join.
-   * @param nullValue Value to use as null value when left side do not match right side.
-   * @param <K> Type of the key for both collections
-   * @param <V1> Type of the values for the left collection.
-   * @param <V2> Type of the values for the right collection.
-   * @return A joined collection of KV where Key is the key and value is a
-   *         KV where Key is of type V1 and Value is type V2. Keys that
-   *         should be null or empty is replaced with nullValue.
-   */
-  public static <K, V1, V2> PCollection<KV<K, KV<V1, V2>>> rightOuterJoin(
-    final PCollection<KV<K, V1>> leftCollection,
-    final PCollection<KV<K, V2>> rightCollection,
-    final V1 nullValue) {
-    Preconditions.checkNotNull(leftCollection);
-    Preconditions.checkNotNull(rightCollection);
-    Preconditions.checkNotNull(nullValue);
-
-    final TupleTag<V1> v1Tuple = new TupleTag<>();
-    final TupleTag<V2> v2Tuple = new TupleTag<>();
-
-    PCollection<KV<K, CoGbkResult>> coGbkResultCollection =
-      KeyedPCollectionTuple.of(v1Tuple, leftCollection)
-        .and(v2Tuple, rightCollection)
-        .apply(CoGroupByKey.<K>create());
-
-    return coGbkResultCollection.apply(ParDo.of(
-      new DoFn<KV<K, CoGbkResult>, KV<K, KV<V1, V2>>>() {
-        @Override
-        public void processElement(ProcessContext c) {
-          KV<K, CoGbkResult> e = c.element();
-
-          Iterable<V1> leftValuesIterable = e.getValue().getAll(v1Tuple);
-          Iterable<V2> rightValuesIterable = e.getValue().getAll(v2Tuple);
-
-          for (V2 rightValue : rightValuesIterable) {
-            if (leftValuesIterable.iterator().hasNext()) {
-              for (V1 leftValue : leftValuesIterable) {
-                c.output(KV.of(e.getKey(), KV.of(leftValue, rightValue)));
-              }
-            } else {
-              c.output(KV.of(e.getKey(), KV.of(nullValue, rightValue)));
-            }
-          }
-        }
-      }))
-      .setCoder(KvCoder.of(((KvCoder) leftCollection.getCoder()).getKeyCoder(),
-                           KvCoder.of(((KvCoder) leftCollection.getCoder()).getValueCoder(),
-                                      ((KvCoder) rightCollection.getCoder()).getValueCoder())));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4059412/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/InnerJoinTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/InnerJoinTest.java b/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/InnerJoinTest.java
new file mode 100644
index 0000000..6622fdc
--- /dev/null
+++ b/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/InnerJoinTest.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.joinlibrary;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * This test Inner Join functionality.
+ */
+public class InnerJoinTest {
+
+  Pipeline p;
+  List<KV<String, Long>> leftListOfKv;
+  List<KV<String, String>> listRightOfKv;
+  List<KV<String, KV<Long, String>>> expectedResult;
+
+  @Before
+  public void setup() {
+
+    p = TestPipeline.create();
+    leftListOfKv = new ArrayList<>();
+    listRightOfKv = new ArrayList<>();
+
+    expectedResult = new ArrayList<>();
+  }
+
+  @Test
+  public void testJoinOneToOneMapping() {
+    leftListOfKv.add(KV.of("Key1", 5L));
+    leftListOfKv.add(KV.of("Key2", 4L));
+    PCollection<KV<String, Long>> leftCollection =
+        p.apply("CreateLeft", Create.of(leftListOfKv));
+
+    listRightOfKv.add(KV.of("Key1", "foo"));
+    listRightOfKv.add(KV.of("Key2", "bar"));
+    PCollection<KV<String, String>> rightCollection =
+        p.apply("CreateRight", Create.of(listRightOfKv));
+
+    PCollection<KV<String, KV<Long, String>>> output = Join.innerJoin(
+      leftCollection, rightCollection);
+
+    expectedResult.add(KV.of("Key1", KV.of(5L, "foo")));
+    expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
+    PAssert.that(output).containsInAnyOrder(expectedResult);
+
+    p.run();
+  }
+
+  @Test
+  public void testJoinOneToManyMapping() {
+    leftListOfKv.add(KV.of("Key2", 4L));
+    PCollection<KV<String, Long>> leftCollection = p
+        .apply("CreateLeft", Create.of(leftListOfKv));
+
+    listRightOfKv.add(KV.of("Key2", "bar"));
+    listRightOfKv.add(KV.of("Key2", "gazonk"));
+    PCollection<KV<String, String>> rightCollection = p
+        .apply("CreateRight", Create.of(listRightOfKv));
+
+    PCollection<KV<String, KV<Long, String>>> output = Join.innerJoin(
+      leftCollection, rightCollection);
+
+    expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
+    expectedResult.add(KV.of("Key2", KV.of(4L, "gazonk")));
+    PAssert.that(output).containsInAnyOrder(expectedResult);
+
+    p.run();
+  }
+
+  @Test
+  public void testJoinManyToOneMapping() {
+    leftListOfKv.add(KV.of("Key2", 4L));
+    leftListOfKv.add(KV.of("Key2", 6L));
+    PCollection<KV<String, Long>> leftCollection = p
+        .apply("CreateLeft", Create.of(leftListOfKv));
+
+    listRightOfKv.add(KV.of("Key2", "bar"));
+    PCollection<KV<String, String>> rightCollection = p
+        .apply("CreateRight", Create.of(listRightOfKv));
+
+    PCollection<KV<String, KV<Long, String>>> output = Join.innerJoin(
+      leftCollection, rightCollection);
+
+    expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
+    expectedResult.add(KV.of("Key2", KV.of(6L, "bar")));
+    PAssert.that(output).containsInAnyOrder(expectedResult);
+
+    p.run();
+  }
+
+  @Test
+  public void testJoinNoneToNoneMapping() {
+    leftListOfKv.add(KV.of("Key2", 4L));
+    PCollection<KV<String, Long>> leftCollection = p
+        .apply("CreateLeft", Create.of(leftListOfKv));
+
+    listRightOfKv.add(KV.of("Key3", "bar"));
+    PCollection<KV<String, String>> rightCollection = p
+        .apply("CreateRight", Create.of(listRightOfKv));
+
+    PCollection<KV<String, KV<Long, String>>> output = Join.innerJoin(
+      leftCollection, rightCollection);
+
+    PAssert.that(output).containsInAnyOrder(expectedResult);
+    p.run();
+  }
+
+  @Test(expected = NullPointerException.class)
+  public void testJoinLeftCollectionNull() {
+    Join.innerJoin(null, p.apply(Create.of(listRightOfKv)));
+  }
+
+  @Test(expected = NullPointerException.class)
+  public void testJoinRightCollectionNull() {
+    Join.innerJoin(p.apply(Create.of(leftListOfKv)), null);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4059412/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/OuterLeftJoinTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/OuterLeftJoinTest.java b/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/OuterLeftJoinTest.java
new file mode 100644
index 0000000..91b0740
--- /dev/null
+++ b/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/OuterLeftJoinTest.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.joinlibrary;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+
+/**
+ * This test Outer Left Join functionality.
+ */
+public class OuterLeftJoinTest {
+
+  Pipeline p;
+  List<KV<String, Long>> leftListOfKv;
+  List<KV<String, String>> listRightOfKv;
+  List<KV<String, KV<Long, String>>> expectedResult;
+
+  @Before
+  public void setup() {
+
+    p = TestPipeline.create();
+    leftListOfKv = new ArrayList<>();
+    listRightOfKv = new ArrayList<>();
+
+    expectedResult = new ArrayList<>();
+  }
+
+  @Test
+  public void testJoinOneToOneMapping() {
+    leftListOfKv.add(KV.of("Key1", 5L));
+    leftListOfKv.add(KV.of("Key2", 4L));
+    PCollection<KV<String, Long>> leftCollection = p
+        .apply("CreateLeft", Create.of(leftListOfKv));
+
+    listRightOfKv.add(KV.of("Key1", "foo"));
+    listRightOfKv.add(KV.of("Key2", "bar"));
+    PCollection<KV<String, String>> rightCollection = p
+        .apply("CreateRight", Create.of(listRightOfKv));
+
+    PCollection<KV<String, KV<Long, String>>> output = Join.leftOuterJoin(
+      leftCollection, rightCollection, "");
+
+    expectedResult.add(KV.of("Key1", KV.of(5L, "foo")));
+    expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
+    PAssert.that(output).containsInAnyOrder(expectedResult);
+
+    p.run();
+  }
+
+  @Test
+  public void testJoinOneToManyMapping() {
+    leftListOfKv.add(KV.of("Key2", 4L));
+    PCollection<KV<String, Long>> leftCollection = p
+        .apply("CreateLeft", Create.of(leftListOfKv));
+
+    listRightOfKv.add(KV.of("Key2", "bar"));
+    listRightOfKv.add(KV.of("Key2", "gazonk"));
+    PCollection<KV<String, String>> rightCollection = p
+        .apply("CreateRight", Create.of(listRightOfKv));
+
+    PCollection<KV<String, KV<Long, String>>> output = Join.leftOuterJoin(
+      leftCollection, rightCollection, "");
+
+    expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
+    expectedResult.add(KV.of("Key2", KV.of(4L, "gazonk")));
+    PAssert.that(output).containsInAnyOrder(expectedResult);
+
+    p.run();
+  }
+
+  @Test
+  public void testJoinManyToOneMapping() {
+    leftListOfKv.add(KV.of("Key2", 4L));
+    leftListOfKv.add(KV.of("Key2", 6L));
+    PCollection<KV<String, Long>> leftCollection = p
+        .apply("CreateLeft", Create.of(leftListOfKv));
+
+    listRightOfKv.add(KV.of("Key2", "bar"));
+    PCollection<KV<String, String>> rightCollection = p
+        .apply("CreateRight", Create.of(listRightOfKv));
+
+    PCollection<KV<String, KV<Long, String>>> output = Join.leftOuterJoin(
+      leftCollection, rightCollection, "");
+
+    expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
+    expectedResult.add(KV.of("Key2", KV.of(6L, "bar")));
+    PAssert.that(output).containsInAnyOrder(expectedResult);
+
+    p.run();
+  }
+
+  @Test
+  public void testJoinOneToNoneMapping() {
+    leftListOfKv.add(KV.of("Key2", 4L));
+    PCollection<KV<String, Long>> leftCollection = p
+        .apply("CreateLeft", Create.of(leftListOfKv));
+
+    listRightOfKv.add(KV.of("Key3", "bar"));
+    PCollection<KV<String, String>> rightCollection = p
+        .apply("CreateRight", Create.of(listRightOfKv));
+
+    PCollection<KV<String, KV<Long, String>>> output = Join.leftOuterJoin(
+      leftCollection, rightCollection, "");
+
+    expectedResult.add(KV.of("Key2", KV.of(4L, "")));
+    PAssert.that(output).containsInAnyOrder(expectedResult);
+    p.run();
+  }
+
+  @Test(expected = NullPointerException.class)
+  public void testJoinLeftCollectionNull() {
+    Join.leftOuterJoin(null, p.apply(Create.of(listRightOfKv)), "");
+  }
+
+  @Test(expected = NullPointerException.class)
+  public void testJoinRightCollectionNull() {
+    Join.leftOuterJoin(p.apply(Create.of(leftListOfKv)), null, "");
+  }
+
+  @Test(expected = NullPointerException.class)
+  public void testJoinNullValueIsNull() {
+    Join.leftOuterJoin(
+        p.apply("CreateLeft", Create.of(leftListOfKv)),
+        p.apply("CreateRight", Create.of(listRightOfKv)),
+        null);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4059412/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/OuterRightJoinTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/OuterRightJoinTest.java b/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/OuterRightJoinTest.java
new file mode 100644
index 0000000..7977df7
--- /dev/null
+++ b/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/OuterRightJoinTest.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.joinlibrary;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+
+/**
+ * This test Outer Right Join functionality.
+ */
+public class OuterRightJoinTest {
+
+  Pipeline p;
+  List<KV<String, Long>> leftListOfKv;
+  List<KV<String, String>> listRightOfKv;
+  List<KV<String, KV<Long, String>>> expectedResult;
+
+  @Before
+  public void setup() {
+
+    p = TestPipeline.create();
+    leftListOfKv = new ArrayList<>();
+    listRightOfKv = new ArrayList<>();
+
+    expectedResult = new ArrayList<>();
+  }
+
+  @Test
+  public void testJoinOneToOneMapping() {
+    leftListOfKv.add(KV.of("Key1", 5L));
+    leftListOfKv.add(KV.of("Key2", 4L));
+    PCollection<KV<String, Long>> leftCollection = p
+        .apply("CreateLeft", Create.of(leftListOfKv));
+
+    listRightOfKv.add(KV.of("Key1", "foo"));
+    listRightOfKv.add(KV.of("Key2", "bar"));
+    PCollection<KV<String, String>> rightCollection = p
+        .apply("CreateRight", Create.of(listRightOfKv));
+
+    PCollection<KV<String, KV<Long, String>>> output = Join.rightOuterJoin(
+      leftCollection, rightCollection, -1L);
+
+    expectedResult.add(KV.of("Key1", KV.of(5L, "foo")));
+    expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
+    PAssert.that(output).containsInAnyOrder(expectedResult);
+
+    p.run();
+  }
+
+  @Test
+  public void testJoinOneToManyMapping() {
+    leftListOfKv.add(KV.of("Key2", 4L));
+    PCollection<KV<String, Long>> leftCollection = p
+        .apply("CreateLeft", Create.of(leftListOfKv));
+
+    listRightOfKv.add(KV.of("Key2", "bar"));
+    listRightOfKv.add(KV.of("Key2", "gazonk"));
+    PCollection<KV<String, String>> rightCollection = p
+        .apply("CreateRight", Create.of(listRightOfKv));
+
+    PCollection<KV<String, KV<Long, String>>> output = Join.rightOuterJoin(
+      leftCollection, rightCollection, -1L);
+
+    expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
+    expectedResult.add(KV.of("Key2", KV.of(4L, "gazonk")));
+    PAssert.that(output).containsInAnyOrder(expectedResult);
+
+    p.run();
+  }
+
+  @Test
+  public void testJoinManyToOneMapping() {
+    leftListOfKv.add(KV.of("Key2", 4L));
+    leftListOfKv.add(KV.of("Key2", 6L));
+    PCollection<KV<String, Long>> leftCollection = p
+        .apply("CreateLeft", Create.of(leftListOfKv));
+
+    listRightOfKv.add(KV.of("Key2", "bar"));
+    PCollection<KV<String, String>> rightCollection = p
+        .apply("CreateRight", Create.of(listRightOfKv));
+
+    PCollection<KV<String, KV<Long, String>>> output = Join.rightOuterJoin(
+      leftCollection, rightCollection, -1L);
+
+    expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
+    expectedResult.add(KV.of("Key2", KV.of(6L, "bar")));
+    PAssert.that(output).containsInAnyOrder(expectedResult);
+
+    p.run();
+  }
+
+  @Test
+  public void testJoinNoneToOneMapping() {
+    leftListOfKv.add(KV.of("Key2", 4L));
+    PCollection<KV<String, Long>> leftCollection = p
+        .apply("CreateLeft", Create.of(leftListOfKv));
+
+    listRightOfKv.add(KV.of("Key3", "bar"));
+    PCollection<KV<String, String>> rightCollection = p
+        .apply("CreateRight", Create.of(listRightOfKv));
+
+    PCollection<KV<String, KV<Long, String>>> output = Join.rightOuterJoin(
+      leftCollection, rightCollection, -1L);
+
+    expectedResult.add(KV.of("Key3", KV.of(-1L, "bar")));
+    PAssert.that(output).containsInAnyOrder(expectedResult);
+    p.run();
+  }
+
+  @Test(expected = NullPointerException.class)
+  public void testJoinLeftCollectionNull() {
+    Join.rightOuterJoin(null, p.apply(Create.of(listRightOfKv)), "");
+  }
+
+  @Test(expected = NullPointerException.class)
+  public void testJoinRightCollectionNull() {
+    Join.rightOuterJoin(p.apply(Create.of(leftListOfKv)), null, -1L);
+  }
+
+  @Test(expected = NullPointerException.class)
+  public void testJoinNullValueIsNull() {
+    Join.rightOuterJoin(
+        p.apply("CreateLeft", Create.of(leftListOfKv)),
+        p.apply("CreateRight", Create.of(listRightOfKv)),
+        null);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4059412/sdks/java/extensions/join-library/src/test/java/org/apache/contrib/joinlibrary/InnerJoinTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/join-library/src/test/java/org/apache/contrib/joinlibrary/InnerJoinTest.java b/sdks/java/extensions/join-library/src/test/java/org/apache/contrib/joinlibrary/InnerJoinTest.java
deleted file mode 100644
index 99e9c4b..0000000
--- a/sdks/java/extensions/join-library/src/test/java/org/apache/contrib/joinlibrary/InnerJoinTest.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.contrib.joinlibrary;
-
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * This test Inner Join functionality.
- */
-public class InnerJoinTest {
-
-  Pipeline p;
-  List<KV<String, Long>> leftListOfKv;
-  List<KV<String, String>> listRightOfKv;
-  List<KV<String, KV<Long, String>>> expectedResult;
-
-  @Before
-  public void setup() {
-
-    p = TestPipeline.create();
-    leftListOfKv = new ArrayList<>();
-    listRightOfKv = new ArrayList<>();
-
-    expectedResult = new ArrayList<>();
-  }
-
-  @Test
-  public void testJoinOneToOneMapping() {
-    leftListOfKv.add(KV.of("Key1", 5L));
-    leftListOfKv.add(KV.of("Key2", 4L));
-    PCollection<KV<String, Long>> leftCollection =
-        p.apply("CreateLeft", Create.of(leftListOfKv));
-
-    listRightOfKv.add(KV.of("Key1", "foo"));
-    listRightOfKv.add(KV.of("Key2", "bar"));
-    PCollection<KV<String, String>> rightCollection =
-        p.apply("CreateRight", Create.of(listRightOfKv));
-
-    PCollection<KV<String, KV<Long, String>>> output = Join.innerJoin(
-      leftCollection, rightCollection);
-
-    expectedResult.add(KV.of("Key1", KV.of(5L, "foo")));
-    expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
-    PAssert.that(output).containsInAnyOrder(expectedResult);
-
-    p.run();
-  }
-
-  @Test
-  public void testJoinOneToManyMapping() {
-    leftListOfKv.add(KV.of("Key2", 4L));
-    PCollection<KV<String, Long>> leftCollection = p
-        .apply("CreateLeft", Create.of(leftListOfKv));
-
-    listRightOfKv.add(KV.of("Key2", "bar"));
-    listRightOfKv.add(KV.of("Key2", "gazonk"));
-    PCollection<KV<String, String>> rightCollection = p
-        .apply("CreateRight", Create.of(listRightOfKv));
-
-    PCollection<KV<String, KV<Long, String>>> output = Join.innerJoin(
-      leftCollection, rightCollection);
-
-    expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
-    expectedResult.add(KV.of("Key2", KV.of(4L, "gazonk")));
-    PAssert.that(output).containsInAnyOrder(expectedResult);
-
-    p.run();
-  }
-
-  @Test
-  public void testJoinManyToOneMapping() {
-    leftListOfKv.add(KV.of("Key2", 4L));
-    leftListOfKv.add(KV.of("Key2", 6L));
-    PCollection<KV<String, Long>> leftCollection = p
-        .apply("CreateLeft", Create.of(leftListOfKv));
-
-    listRightOfKv.add(KV.of("Key2", "bar"));
-    PCollection<KV<String, String>> rightCollection = p
-        .apply("CreateRight", Create.of(listRightOfKv));
-
-    PCollection<KV<String, KV<Long, String>>> output = Join.innerJoin(
-      leftCollection, rightCollection);
-
-    expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
-    expectedResult.add(KV.of("Key2", KV.of(6L, "bar")));
-    PAssert.that(output).containsInAnyOrder(expectedResult);
-
-    p.run();
-  }
-
-  @Test
-  public void testJoinNoneToNoneMapping() {
-    leftListOfKv.add(KV.of("Key2", 4L));
-    PCollection<KV<String, Long>> leftCollection = p
-        .apply("CreateLeft", Create.of(leftListOfKv));
-
-    listRightOfKv.add(KV.of("Key3", "bar"));
-    PCollection<KV<String, String>> rightCollection = p
-        .apply("CreateRight", Create.of(listRightOfKv));
-
-    PCollection<KV<String, KV<Long, String>>> output = Join.innerJoin(
-      leftCollection, rightCollection);
-
-    PAssert.that(output).containsInAnyOrder(expectedResult);
-    p.run();
-  }
-
-  @Test(expected = NullPointerException.class)
-  public void testJoinLeftCollectionNull() {
-    Join.innerJoin(null, p.apply(Create.of(listRightOfKv)));
-  }
-
-  @Test(expected = NullPointerException.class)
-  public void testJoinRightCollectionNull() {
-    Join.innerJoin(p.apply(Create.of(leftListOfKv)), null);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4059412/sdks/java/extensions/join-library/src/test/java/org/apache/contrib/joinlibrary/OuterLeftJoinTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/join-library/src/test/java/org/apache/contrib/joinlibrary/OuterLeftJoinTest.java b/sdks/java/extensions/join-library/src/test/java/org/apache/contrib/joinlibrary/OuterLeftJoinTest.java
deleted file mode 100644
index ca09136..0000000
--- a/sdks/java/extensions/join-library/src/test/java/org/apache/contrib/joinlibrary/OuterLeftJoinTest.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.contrib.joinlibrary;
-
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.List;
-
-
-/**
- * This test Outer Left Join functionality.
- */
-public class OuterLeftJoinTest {
-
-  Pipeline p;
-  List<KV<String, Long>> leftListOfKv;
-  List<KV<String, String>> listRightOfKv;
-  List<KV<String, KV<Long, String>>> expectedResult;
-
-  @Before
-  public void setup() {
-
-    p = TestPipeline.create();
-    leftListOfKv = new ArrayList<>();
-    listRightOfKv = new ArrayList<>();
-
-    expectedResult = new ArrayList<>();
-  }
-
-  @Test
-  public void testJoinOneToOneMapping() {
-    leftListOfKv.add(KV.of("Key1", 5L));
-    leftListOfKv.add(KV.of("Key2", 4L));
-    PCollection<KV<String, Long>> leftCollection = p
-        .apply("CreateLeft", Create.of(leftListOfKv));
-
-    listRightOfKv.add(KV.of("Key1", "foo"));
-    listRightOfKv.add(KV.of("Key2", "bar"));
-    PCollection<KV<String, String>> rightCollection = p
-        .apply("CreateRight", Create.of(listRightOfKv));
-
-    PCollection<KV<String, KV<Long, String>>> output = Join.leftOuterJoin(
-      leftCollection, rightCollection, "");
-
-    expectedResult.add(KV.of("Key1", KV.of(5L, "foo")));
-    expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
-    PAssert.that(output).containsInAnyOrder(expectedResult);
-
-    p.run();
-  }
-
-  @Test
-  public void testJoinOneToManyMapping() {
-    leftListOfKv.add(KV.of("Key2", 4L));
-    PCollection<KV<String, Long>> leftCollection = p
-        .apply("CreateLeft", Create.of(leftListOfKv));
-
-    listRightOfKv.add(KV.of("Key2", "bar"));
-    listRightOfKv.add(KV.of("Key2", "gazonk"));
-    PCollection<KV<String, String>> rightCollection = p
-        .apply("CreateRight", Create.of(listRightOfKv));
-
-    PCollection<KV<String, KV<Long, String>>> output = Join.leftOuterJoin(
-      leftCollection, rightCollection, "");
-
-    expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
-    expectedResult.add(KV.of("Key2", KV.of(4L, "gazonk")));
-    PAssert.that(output).containsInAnyOrder(expectedResult);
-
-    p.run();
-  }
-
-  @Test
-  public void testJoinManyToOneMapping() {
-    leftListOfKv.add(KV.of("Key2", 4L));
-    leftListOfKv.add(KV.of("Key2", 6L));
-    PCollection<KV<String, Long>> leftCollection = p
-        .apply("CreateLeft", Create.of(leftListOfKv));
-
-    listRightOfKv.add(KV.of("Key2", "bar"));
-    PCollection<KV<String, String>> rightCollection = p
-        .apply("CreateRight", Create.of(listRightOfKv));
-
-    PCollection<KV<String, KV<Long, String>>> output = Join.leftOuterJoin(
-      leftCollection, rightCollection, "");
-
-    expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
-    expectedResult.add(KV.of("Key2", KV.of(6L, "bar")));
-    PAssert.that(output).containsInAnyOrder(expectedResult);
-
-    p.run();
-  }
-
-  @Test
-  public void testJoinOneToNoneMapping() {
-    leftListOfKv.add(KV.of("Key2", 4L));
-    PCollection<KV<String, Long>> leftCollection = p
-        .apply("CreateLeft", Create.of(leftListOfKv));
-
-    listRightOfKv.add(KV.of("Key3", "bar"));
-    PCollection<KV<String, String>> rightCollection = p
-        .apply("CreateRight", Create.of(listRightOfKv));
-
-    PCollection<KV<String, KV<Long, String>>> output = Join.leftOuterJoin(
-      leftCollection, rightCollection, "");
-
-    expectedResult.add(KV.of("Key2", KV.of(4L, "")));
-    PAssert.that(output).containsInAnyOrder(expectedResult);
-    p.run();
-  }
-
-  @Test(expected = NullPointerException.class)
-  public void testJoinLeftCollectionNull() {
-    Join.leftOuterJoin(null, p.apply(Create.of(listRightOfKv)), "");
-  }
-
-  @Test(expected = NullPointerException.class)
-  public void testJoinRightCollectionNull() {
-    Join.leftOuterJoin(p.apply(Create.of(leftListOfKv)), null, "");
-  }
-
-  @Test(expected = NullPointerException.class)
-  public void testJoinNullValueIsNull() {
-    Join.leftOuterJoin(
-        p.apply("CreateLeft", Create.of(leftListOfKv)),
-        p.apply("CreateRight", Create.of(listRightOfKv)),
-        null);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4059412/sdks/java/extensions/join-library/src/test/java/org/apache/contrib/joinlibrary/OuterRightJoinTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/join-library/src/test/java/org/apache/contrib/joinlibrary/OuterRightJoinTest.java b/sdks/java/extensions/join-library/src/test/java/org/apache/contrib/joinlibrary/OuterRightJoinTest.java
deleted file mode 100644
index 86028ac..0000000
--- a/sdks/java/extensions/join-library/src/test/java/org/apache/contrib/joinlibrary/OuterRightJoinTest.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.contrib.joinlibrary;
-
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.List;
-
-
-/**
- * This test Outer Right Join functionality.
- */
-public class OuterRightJoinTest {
-
-  Pipeline p;
-  List<KV<String, Long>> leftListOfKv;
-  List<KV<String, String>> listRightOfKv;
-  List<KV<String, KV<Long, String>>> expectedResult;
-
-  @Before
-  public void setup() {
-
-    p = TestPipeline.create();
-    leftListOfKv = new ArrayList<>();
-    listRightOfKv = new ArrayList<>();
-
-    expectedResult = new ArrayList<>();
-  }
-
-  @Test
-  public void testJoinOneToOneMapping() {
-    leftListOfKv.add(KV.of("Key1", 5L));
-    leftListOfKv.add(KV.of("Key2", 4L));
-    PCollection<KV<String, Long>> leftCollection = p
-        .apply("CreateLeft", Create.of(leftListOfKv));
-
-    listRightOfKv.add(KV.of("Key1", "foo"));
-    listRightOfKv.add(KV.of("Key2", "bar"));
-    PCollection<KV<String, String>> rightCollection = p
-        .apply("CreateRight", Create.of(listRightOfKv));
-
-    PCollection<KV<String, KV<Long, String>>> output = Join.rightOuterJoin(
-      leftCollection, rightCollection, -1L);
-
-    expectedResult.add(KV.of("Key1", KV.of(5L, "foo")));
-    expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
-    PAssert.that(output).containsInAnyOrder(expectedResult);
-
-    p.run();
-  }
-
-  @Test
-  public void testJoinOneToManyMapping() {
-    leftListOfKv.add(KV.of("Key2", 4L));
-    PCollection<KV<String, Long>> leftCollection = p
-        .apply("CreateLeft", Create.of(leftListOfKv));
-
-    listRightOfKv.add(KV.of("Key2", "bar"));
-    listRightOfKv.add(KV.of("Key2", "gazonk"));
-    PCollection<KV<String, String>> rightCollection = p
-        .apply("CreateRight", Create.of(listRightOfKv));
-
-    PCollection<KV<String, KV<Long, String>>> output = Join.rightOuterJoin(
-      leftCollection, rightCollection, -1L);
-
-    expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
-    expectedResult.add(KV.of("Key2", KV.of(4L, "gazonk")));
-    PAssert.that(output).containsInAnyOrder(expectedResult);
-
-    p.run();
-  }
-
-  @Test
-  public void testJoinManyToOneMapping() {
-    leftListOfKv.add(KV.of("Key2", 4L));
-    leftListOfKv.add(KV.of("Key2", 6L));
-    PCollection<KV<String, Long>> leftCollection = p
-        .apply("CreateLeft", Create.of(leftListOfKv));
-
-    listRightOfKv.add(KV.of("Key2", "bar"));
-    PCollection<KV<String, String>> rightCollection = p
-        .apply("CreateRight", Create.of(listRightOfKv));
-
-    PCollection<KV<String, KV<Long, String>>> output = Join.rightOuterJoin(
-      leftCollection, rightCollection, -1L);
-
-    expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
-    expectedResult.add(KV.of("Key2", KV.of(6L, "bar")));
-    PAssert.that(output).containsInAnyOrder(expectedResult);
-
-    p.run();
-  }
-
-  @Test
-  public void testJoinNoneToOneMapping() {
-    leftListOfKv.add(KV.of("Key2", 4L));
-    PCollection<KV<String, Long>> leftCollection = p
-        .apply("CreateLeft", Create.of(leftListOfKv));
-
-    listRightOfKv.add(KV.of("Key3", "bar"));
-    PCollection<KV<String, String>> rightCollection = p
-        .apply("CreateRight", Create.of(listRightOfKv));
-
-    PCollection<KV<String, KV<Long, String>>> output = Join.rightOuterJoin(
-      leftCollection, rightCollection, -1L);
-
-    expectedResult.add(KV.of("Key3", KV.of(-1L, "bar")));
-    PAssert.that(output).containsInAnyOrder(expectedResult);
-    p.run();
-  }
-
-  @Test(expected = NullPointerException.class)
-  public void testJoinLeftCollectionNull() {
-    Join.rightOuterJoin(null, p.apply(Create.of(listRightOfKv)), "");
-  }
-
-  @Test(expected = NullPointerException.class)
-  public void testJoinRightCollectionNull() {
-    Join.rightOuterJoin(p.apply(Create.of(leftListOfKv)), null, -1L);
-  }
-
-  @Test(expected = NullPointerException.class)
-  public void testJoinNullValueIsNull() {
-    Join.rightOuterJoin(
-        p.apply("CreateLeft", Create.of(leftListOfKv)),
-        p.apply("CreateRight", Create.of(listRightOfKv)),
-        null);
-  }
-}


[3/3] incubator-beam git commit: This closes #240

Posted by bc...@apache.org.
This closes #240


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0f7b8161
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0f7b8161
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0f7b8161

Branch: refs/heads/master
Commit: 0f7b8161546c0e160cb1722275a698c15f5aafc1
Parents: d5814a3 e405941
Author: bchambers <bc...@google.com>
Authored: Mon Apr 25 17:32:30 2016 -0700
Committer: bchambers <bc...@google.com>
Committed: Mon Apr 25 17:32:30 2016 -0700

----------------------------------------------------------------------
 contrib/README.md                               |  53 ------
 contrib/hadoop/AUTHORS.md                       |   7 -
 contrib/join-library/AUTHORS.md                 |   6 -
 contrib/join-library/README.md                  |  33 ----
 contrib/join-library/pom.xml                    | 185 ------------------
 .../org/apache/contrib/joinlibrary/Join.java    | 186 -------------------
 .../contrib/joinlibrary/InnerJoinTest.java      | 143 --------------
 .../contrib/joinlibrary/OuterLeftJoinTest.java  | 153 ---------------
 .../contrib/joinlibrary/OuterRightJoinTest.java | 153 ---------------
 sdks/java/extensions/join-library/README.md     |  33 ++++
 sdks/java/extensions/join-library/pom.xml       | 111 +++++++++++
 .../beam/sdk/extensions/joinlibrary/Join.java   | 186 +++++++++++++++++++
 .../extensions/joinlibrary/InnerJoinTest.java   | 143 ++++++++++++++
 .../joinlibrary/OuterLeftJoinTest.java          | 153 +++++++++++++++
 .../joinlibrary/OuterRightJoinTest.java         | 153 +++++++++++++++
 sdks/java/extensions/pom.xml                    |  40 ++++
 sdks/java/pom.xml                               |   1 +
 17 files changed, 820 insertions(+), 919 deletions(-)
----------------------------------------------------------------------



[2/3] incubator-beam git commit: Move contrib/join-library to sdks/java/extensions.

Posted by bc...@apache.org.
Move contrib/join-library to sdks/java/extensions.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e2ca8890
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e2ca8890
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e2ca8890

Branch: refs/heads/master
Commit: e2ca88908097e58c4a4e022a86de7a23d9412850
Parents: d5814a3
Author: Davor Bonaci <da...@google.com>
Authored: Mon Apr 25 16:42:31 2016 -0700
Committer: bchambers <bc...@google.com>
Committed: Mon Apr 25 17:32:23 2016 -0700

----------------------------------------------------------------------
 contrib/README.md                               |  53 ------
 contrib/hadoop/AUTHORS.md                       |   7 -
 contrib/join-library/AUTHORS.md                 |   6 -
 contrib/join-library/README.md                  |  33 ----
 contrib/join-library/pom.xml                    | 185 ------------------
 .../org/apache/contrib/joinlibrary/Join.java    | 186 -------------------
 .../contrib/joinlibrary/InnerJoinTest.java      | 143 --------------
 .../contrib/joinlibrary/OuterLeftJoinTest.java  | 153 ---------------
 .../contrib/joinlibrary/OuterRightJoinTest.java | 153 ---------------
 sdks/java/extensions/join-library/README.md     |  33 ++++
 sdks/java/extensions/join-library/pom.xml       | 111 +++++++++++
 .../org/apache/contrib/joinlibrary/Join.java    | 186 +++++++++++++++++++
 .../contrib/joinlibrary/InnerJoinTest.java      | 143 ++++++++++++++
 .../contrib/joinlibrary/OuterLeftJoinTest.java  | 153 +++++++++++++++
 .../contrib/joinlibrary/OuterRightJoinTest.java | 153 +++++++++++++++
 sdks/java/extensions/pom.xml                    |  40 ++++
 sdks/java/pom.xml                               |   1 +
 17 files changed, 820 insertions(+), 919 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e2ca8890/contrib/README.md
----------------------------------------------------------------------
diff --git a/contrib/README.md b/contrib/README.md
deleted file mode 100644
index b99cf46..0000000
--- a/contrib/README.md
+++ /dev/null
@@ -1,53 +0,0 @@
-# Community contributions
-
-This directory hosts a wide variety of community contributions that may be
-useful to other users of
-[Google Cloud Dataflow](https://cloud.google.com/dataflow/),
-but may not be appropriate or ready yet for inclusion into the
-[mainline SDK](https://github.com/GoogleCloudPlatform/DataflowJavaSDK/) or a
-separate Google-maintained artifact.
-
-## Organization
-
-Each subdirectory represents a logically separate and independent module.
-Preferably, the code is hosted directly in this repository. When appropriate, we
-are also open to linking external repositories via
-[`submodule`](http://git-scm.com/docs/git-submodule/) functionality within Git.
-
-While we are happy to host individual modules to provide additional value to all
-Cloud Dataflow users, the modules are _maintained solely by their respective
-authors_. We will make sure that modules are related to Cloud Dataflow, that
-they are distributed under the same license as the mainline SDK, and provide
-some guidance to the authors to make the quality as high as possible.
-
-We __cannot__, however, provide _any_ guarantees about correctness,
-compatibility, performance, support, test coverage, maintenance or future
-availability of individual modules hosted here.
-
-## Process
-
-In general, we recommend to get in touch with us through the issue tracker
-first. That way we can help out and possibly guide you. Coordinating up front
-makes it much easier to avoid frustration later on.
-
-We welcome pull requests with a new module from everyone. Every module must be
-related to Cloud Dataflow and must have an informative README.md file. We will
-provide general guidance, but usually won't be reviewing the module in detail.
-We reserve the right to refuse acceptance to any module, or remove it at any
-time in the future.
-
-We also welcome improvements to an existing module from everyone. We'll often
-wait for comments from the primary author of the module before merging a pull
-request from a non-primary author.
-
-As the module matures, we may choose to pull it directly into the mainline SDK
-or promote it to a Google-managed artifact.
-
-## Licensing
-
-We require all contributors to sign the Contributor License Agreement, exactly
-as we require for any contributions to the mainline SDK. More information is
-available in our [CONTRIBUTING.md](https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/CONTRIBUTING.md)
-file.
-
-_Thank you for your contribution to the Cloud Dataflow community!_

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e2ca8890/contrib/hadoop/AUTHORS.md
----------------------------------------------------------------------
diff --git a/contrib/hadoop/AUTHORS.md b/contrib/hadoop/AUTHORS.md
deleted file mode 100644
index 6effdb9..0000000
--- a/contrib/hadoop/AUTHORS.md
+++ /dev/null
@@ -1,7 +0,0 @@
-# Authors of 'hadoop' module
-
-The following is the official list of authors for copyright purposes of this community-contributed module.
-
-    Cloudera
-    Tom White, tom [at] cloudera [dot] com
-    Google Inc.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e2ca8890/contrib/join-library/AUTHORS.md
----------------------------------------------------------------------
diff --git a/contrib/join-library/AUTHORS.md b/contrib/join-library/AUTHORS.md
deleted file mode 100644
index d32b6a7..0000000
--- a/contrib/join-library/AUTHORS.md
+++ /dev/null
@@ -1,6 +0,0 @@
-# Authors of join-library
-
-The following is the official list of authors for copyright purposes of this community-contributed module.
-
-    Google Inc.
-    Magnus Runesson, M.Runesson [at] gmail [dot] com

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e2ca8890/contrib/join-library/README.md
----------------------------------------------------------------------
diff --git a/contrib/join-library/README.md b/contrib/join-library/README.md
deleted file mode 100644
index 8e2a011..0000000
--- a/contrib/join-library/README.md
+++ /dev/null
@@ -1,33 +0,0 @@
-Join-library
-============
-
-Join-library provides inner join, outer left and right join functions to
-Google Cloud Dataflow. The aim is to simplify the most common cases of join to a
-simple function call.
-
-The functions are generic so it supports join of any types supported by
-Dataflow. Input to the join functions are PCollections of Key/Values. Both the
-left and right PCollections need the same type for the key. All the join
-functions return a Key/Value where Key is the join key and value is
-a Key/Value where the key is the left value and right is the value.
-
-In the cases of outer join, since null cannot be serialized the user have
-to provide a value that represent null for that particular use case.
-
-Example how to use join-library:
-
-    PCollection<KV<String, String>> leftPcollection = ...
-    PCollection<KV<String, Long>> rightPcollection = ...
-
-    PCollection<KV<String, KV<String, Long>>> joinedPcollection =
-      Join.innerJoin(leftPcollection, rightPcollection);
-
-Join-library can be found on maven-central:
-
-    <dependency>
-      <groupId>org.linuxalert.dataflow</groupId>
-      <artifactId>google-cloud-dataflow-java-contrib-joinlibrary</artifactId>
-      <version>0.0.3</version>
-    </dependency>
-
-Questions or comments: `M.Runesson [at] gmail [dot] com`

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e2ca8890/contrib/join-library/pom.xml
----------------------------------------------------------------------
diff --git a/contrib/join-library/pom.xml b/contrib/join-library/pom.xml
deleted file mode 100644
index 090f445..0000000
--- a/contrib/join-library/pom.xml
+++ /dev/null
@@ -1,185 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-
-  <modelVersion>4.0.0</modelVersion>
-
-  <groupId>org.linuxalert.dataflow</groupId>
-  <artifactId>google-cloud-dataflow-java-contrib-joinlibrary</artifactId>
-  <name>Google Cloud Dataflow Join Library</name>
-  <description>Library with generic join functions for Dataflow.</description>
-  <url>https://github.com/GoogleCloudPlatform/DataflowJavaSDK/tree/master/contrib/join-library</url>
-  <developers>
-    <developer>
-      <organization>Google Inc.</organization>
-      <organizationUrl>http://www.google.com</organizationUrl>
-    </developer>
-    <developer>
-      <name>Magnus Runesson</name>
-      <email>M (dot) Runesson (at) gmail (dot) com</email>
-      <roles>
-        <role>Developer</role>
-      </roles>
-      <timezone>+1</timezone>
-    </developer>
-  </developers>
-  <contributors>
-    <contributor>
-      <name>Magnus Runesson</name>
-      <email>M (dot) Runesson (at) gmail (dot) com</email>
-      <url>https://github.com/mrunesson</url>
-    </contributor>
-  </contributors>
-  <version>0.0.4</version>
-  <packaging>jar</packaging>
-
-  <licenses>
-    <license>
-      <name>Apache License, Version 2.0</name>
-      <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
-      <distribution>repo</distribution>
-    </license>
-  </licenses>
-
-  <scm>
-    <connection>scm:git:git@github.com:GoogleCloudPlatform/DataflowJavaSDK.git</connection>
-    <developerConnection>scm:git:git@github.com:GoogleCloudPlatform/DataflowJavaSDK.git</developerConnection>
-    <url>https://github.com/GoogleCloudPlatform/DataflowJavaSDK/tree/master/contrib/join-library</url>
-  </scm>
-
-  <properties>
-    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-    <beam-version>[0.1.0, 1.0.0)</beam-version>
-  </properties>
-
-  <build>
-    <plugins>
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-compiler-plugin</artifactId>
-        <version>3.2</version>
-        <configuration>
-          <source>1.7</source>
-          <target>1.7</target>
-        </configuration>
-      </plugin>
-
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-checkstyle-plugin</artifactId>
-        <version>2.17</version>
-        <configuration>
-          <configLocation>../../sdks/java/checkstyle.xml</configLocation>
-          <consoleOutput>true</consoleOutput>
-          <failOnViolation>true</failOnViolation>
-          <includeTestSourceDirectory>true</includeTestSourceDirectory>
-        </configuration>
-        <executions>
-          <execution>
-            <phase>validate</phase>
-            <goals>
-              <goal>check</goal>
-            </goals>
-          </execution>
-        </executions>
-      </plugin>
-
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-source-plugin</artifactId>
-        <version>2.4</version>
-        <executions>
-          <execution>
-            <id>attach-sources</id>
-            <goals>
-              <goal>jar</goal>
-            </goals>
-          </execution>
-        </executions>
-      </plugin>
-
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-javadoc-plugin</artifactId>
-        <version>2.10.3</version>
-        <executions>
-          <execution>
-            <id>attach-javadocs</id>
-            <goals>
-              <goal>jar</goal>
-            </goals>
-          </execution>
-        </executions>
-      </plugin>
-
-      <plugin>
-        <groupId>org.sonatype.plugins</groupId>
-        <artifactId>nexus-staging-maven-plugin</artifactId>
-        <version>1.6.3</version>
-        <extensions>true</extensions>
-        <configuration>
-          <serverId>ossrh</serverId>
-          <nexusUrl>https://oss.sonatype.org/</nexusUrl>
-          <autoReleaseAfterClose>true</autoReleaseAfterClose>
-        </configuration>
-      </plugin>
-
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-gpg-plugin</artifactId>
-        <version>1.5</version>
-        <executions>
-          <execution>
-            <id>sign-artifacts</id>
-            <phase>verify</phase>
-            <goals>
-              <goal>sign</goal>
-            </goals>
-          </execution>
-        </executions>
-      </plugin>
-    </plugins>
-  </build>
-
-  <distributionManagement>
-    <snapshotRepository>
-      <id>ossrh</id>
-      <url>https://oss.sonatype.org/content/repositories/snapshots</url>
-    </snapshotRepository>
-    <repository>
-      <id>ossrh</id>
-      <url>https://oss.sonatype.org/service/local/staging/deploy/maven2/</url>
-    </repository>
-  </distributionManagement>
-
-  <dependencies>
-    <dependency>
-      <groupId>org.apache.beam</groupId>
-      <artifactId>java-sdk-all</artifactId>
-      <version>${beam-version}</version>
-    </dependency>
-
-    <dependency>
-      <groupId>com.google.guava</groupId>
-      <artifactId>guava</artifactId>
-      <version>19.0</version>
-    </dependency>
-
-    <!-- Dependency for tests -->
-    <dependency>
-      <groupId>org.hamcrest</groupId>
-      <artifactId>hamcrest-all</artifactId>
-      <version>1.3</version>
-      <scope>test</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>junit</groupId>
-      <artifactId>junit</artifactId>
-      <version>4.12</version>
-      <scope>test</scope>
-    </dependency>
-  </dependencies>
-
-</project>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e2ca8890/contrib/join-library/src/main/java/org/apache/contrib/joinlibrary/Join.java
----------------------------------------------------------------------
diff --git a/contrib/join-library/src/main/java/org/apache/contrib/joinlibrary/Join.java b/contrib/join-library/src/main/java/org/apache/contrib/joinlibrary/Join.java
deleted file mode 100644
index 6421e97..0000000
--- a/contrib/join-library/src/main/java/org/apache/contrib/joinlibrary/Join.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.contrib.joinlibrary;
-
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.join.CoGbkResult;
-import org.apache.beam.sdk.transforms.join.CoGroupByKey;
-import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.TupleTag;
-import com.google.common.base.Preconditions;
-
-/**
- * Utility class with different versions of joins. All methods join two collections of
- * key/value pairs (KV).
- */
-public class Join {
-
-  /**
-   * Inner join of two collections of KV elements.
-   * @param leftCollection Left side collection to join.
-   * @param rightCollection Right side collection to join.
-   * @param <K> Type of the key for both collections
-   * @param <V1> Type of the values for the left collection.
-   * @param <V2> Type of the values for the right collection.
-   * @return A joined collection of KV where Key is the key and value is a
-   *         KV where Key is of type V1 and Value is type V2.
-   */
-  public static <K, V1, V2> PCollection<KV<K, KV<V1, V2>>> innerJoin(
-    final PCollection<KV<K, V1>> leftCollection, final PCollection<KV<K, V2>> rightCollection) {
-    Preconditions.checkNotNull(leftCollection);
-    Preconditions.checkNotNull(rightCollection);
-
-    final TupleTag<V1> v1Tuple = new TupleTag<>();
-    final TupleTag<V2> v2Tuple = new TupleTag<>();
-
-    PCollection<KV<K, CoGbkResult>> coGbkResultCollection =
-      KeyedPCollectionTuple.of(v1Tuple, leftCollection)
-        .and(v2Tuple, rightCollection)
-        .apply(CoGroupByKey.<K>create());
-
-    return coGbkResultCollection.apply(ParDo.of(
-      new DoFn<KV<K, CoGbkResult>, KV<K, KV<V1, V2>>>() {
-        @Override
-        public void processElement(ProcessContext c) {
-          KV<K, CoGbkResult> e = c.element();
-
-          Iterable<V1> leftValuesIterable = e.getValue().getAll(v1Tuple);
-          Iterable<V2> rightValuesIterable = e.getValue().getAll(v2Tuple);
-
-          for (V1 leftValue : leftValuesIterable) {
-            for (V2 rightValue : rightValuesIterable) {
-              c.output(KV.of(e.getKey(), KV.of(leftValue, rightValue)));
-            }
-          }
-        }
-      }))
-      .setCoder(KvCoder.of(((KvCoder) leftCollection.getCoder()).getKeyCoder(),
-                           KvCoder.of(((KvCoder) leftCollection.getCoder()).getValueCoder(),
-                                      ((KvCoder) rightCollection.getCoder()).getValueCoder())));
-  }
-
-  /**
-   * Left Outer Join of two collections of KV elements.
-   * @param leftCollection Left side collection to join.
-   * @param rightCollection Right side collection to join.
-   * @param nullValue Value to use as null value when right side do not match left side.
-   * @param <K> Type of the key for both collections
-   * @param <V1> Type of the values for the left collection.
-   * @param <V2> Type of the values for the right collection.
-   * @return A joined collection of KV where Key is the key and value is a
-   *         KV where Key is of type V1 and Value is type V2. Values that
-   *         should be null or empty is replaced with nullValue.
-   */
-  public static <K, V1, V2> PCollection<KV<K, KV<V1, V2>>> leftOuterJoin(
-    final PCollection<KV<K, V1>> leftCollection,
-    final PCollection<KV<K, V2>> rightCollection,
-    final V2 nullValue) {
-    Preconditions.checkNotNull(leftCollection);
-    Preconditions.checkNotNull(rightCollection);
-    Preconditions.checkNotNull(nullValue);
-
-    final TupleTag<V1> v1Tuple = new TupleTag<>();
-    final TupleTag<V2> v2Tuple = new TupleTag<>();
-
-    PCollection<KV<K, CoGbkResult>> coGbkResultCollection =
-      KeyedPCollectionTuple.of(v1Tuple, leftCollection)
-        .and(v2Tuple, rightCollection)
-        .apply(CoGroupByKey.<K>create());
-
-    return coGbkResultCollection.apply(ParDo.of(
-      new DoFn<KV<K, CoGbkResult>, KV<K, KV<V1, V2>>>() {
-        @Override
-        public void processElement(ProcessContext c) {
-          KV<K, CoGbkResult> e = c.element();
-
-          Iterable<V1> leftValuesIterable = e.getValue().getAll(v1Tuple);
-          Iterable<V2> rightValuesIterable = e.getValue().getAll(v2Tuple);
-
-          for (V1 leftValue : leftValuesIterable) {
-            if (rightValuesIterable.iterator().hasNext()) {
-              for (V2 rightValue : rightValuesIterable) {
-                c.output(KV.of(e.getKey(), KV.of(leftValue, rightValue)));
-              }
-            } else {
-              c.output(KV.of(e.getKey(), KV.of(leftValue, nullValue)));
-            }
-          }
-        }
-      }))
-      .setCoder(KvCoder.of(((KvCoder) leftCollection.getCoder()).getKeyCoder(),
-                           KvCoder.of(((KvCoder) leftCollection.getCoder()).getValueCoder(),
-                                      ((KvCoder) rightCollection.getCoder()).getValueCoder())));
-  }
-
-  /**
-   * Right Outer Join of two collections of KV elements.
-   * @param leftCollection Left side collection to join.
-   * @param rightCollection Right side collection to join.
-   * @param nullValue Value to use as null value when left side do not match right side.
-   * @param <K> Type of the key for both collections
-   * @param <V1> Type of the values for the left collection.
-   * @param <V2> Type of the values for the right collection.
-   * @return A joined collection of KV where Key is the key and value is a
-   *         KV where Key is of type V1 and Value is type V2. Keys that
-   *         should be null or empty is replaced with nullValue.
-   */
-  public static <K, V1, V2> PCollection<KV<K, KV<V1, V2>>> rightOuterJoin(
-    final PCollection<KV<K, V1>> leftCollection,
-    final PCollection<KV<K, V2>> rightCollection,
-    final V1 nullValue) {
-    Preconditions.checkNotNull(leftCollection);
-    Preconditions.checkNotNull(rightCollection);
-    Preconditions.checkNotNull(nullValue);
-
-    final TupleTag<V1> v1Tuple = new TupleTag<>();
-    final TupleTag<V2> v2Tuple = new TupleTag<>();
-
-    PCollection<KV<K, CoGbkResult>> coGbkResultCollection =
-      KeyedPCollectionTuple.of(v1Tuple, leftCollection)
-        .and(v2Tuple, rightCollection)
-        .apply(CoGroupByKey.<K>create());
-
-    return coGbkResultCollection.apply(ParDo.of(
-      new DoFn<KV<K, CoGbkResult>, KV<K, KV<V1, V2>>>() {
-        @Override
-        public void processElement(ProcessContext c) {
-          KV<K, CoGbkResult> e = c.element();
-
-          Iterable<V1> leftValuesIterable = e.getValue().getAll(v1Tuple);
-          Iterable<V2> rightValuesIterable = e.getValue().getAll(v2Tuple);
-
-          for (V2 rightValue : rightValuesIterable) {
-            if (leftValuesIterable.iterator().hasNext()) {
-              for (V1 leftValue : leftValuesIterable) {
-                c.output(KV.of(e.getKey(), KV.of(leftValue, rightValue)));
-              }
-            } else {
-              c.output(KV.of(e.getKey(), KV.of(nullValue, rightValue)));
-            }
-          }
-        }
-      }))
-      .setCoder(KvCoder.of(((KvCoder) leftCollection.getCoder()).getKeyCoder(),
-                           KvCoder.of(((KvCoder) leftCollection.getCoder()).getValueCoder(),
-                                      ((KvCoder) rightCollection.getCoder()).getValueCoder())));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e2ca8890/contrib/join-library/src/test/java/org/apache/contrib/joinlibrary/InnerJoinTest.java
----------------------------------------------------------------------
diff --git a/contrib/join-library/src/test/java/org/apache/contrib/joinlibrary/InnerJoinTest.java b/contrib/join-library/src/test/java/org/apache/contrib/joinlibrary/InnerJoinTest.java
deleted file mode 100644
index 99e9c4b..0000000
--- a/contrib/join-library/src/test/java/org/apache/contrib/joinlibrary/InnerJoinTest.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.contrib.joinlibrary;
-
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * This test Inner Join functionality.
- */
-public class InnerJoinTest {
-
-  Pipeline p;
-  List<KV<String, Long>> leftListOfKv;
-  List<KV<String, String>> listRightOfKv;
-  List<KV<String, KV<Long, String>>> expectedResult;
-
-  @Before
-  public void setup() {
-
-    p = TestPipeline.create();
-    leftListOfKv = new ArrayList<>();
-    listRightOfKv = new ArrayList<>();
-
-    expectedResult = new ArrayList<>();
-  }
-
-  @Test
-  public void testJoinOneToOneMapping() {
-    leftListOfKv.add(KV.of("Key1", 5L));
-    leftListOfKv.add(KV.of("Key2", 4L));
-    PCollection<KV<String, Long>> leftCollection =
-        p.apply("CreateLeft", Create.of(leftListOfKv));
-
-    listRightOfKv.add(KV.of("Key1", "foo"));
-    listRightOfKv.add(KV.of("Key2", "bar"));
-    PCollection<KV<String, String>> rightCollection =
-        p.apply("CreateRight", Create.of(listRightOfKv));
-
-    PCollection<KV<String, KV<Long, String>>> output = Join.innerJoin(
-      leftCollection, rightCollection);
-
-    expectedResult.add(KV.of("Key1", KV.of(5L, "foo")));
-    expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
-    PAssert.that(output).containsInAnyOrder(expectedResult);
-
-    p.run();
-  }
-
-  @Test
-  public void testJoinOneToManyMapping() {
-    leftListOfKv.add(KV.of("Key2", 4L));
-    PCollection<KV<String, Long>> leftCollection = p
-        .apply("CreateLeft", Create.of(leftListOfKv));
-
-    listRightOfKv.add(KV.of("Key2", "bar"));
-    listRightOfKv.add(KV.of("Key2", "gazonk"));
-    PCollection<KV<String, String>> rightCollection = p
-        .apply("CreateRight", Create.of(listRightOfKv));
-
-    PCollection<KV<String, KV<Long, String>>> output = Join.innerJoin(
-      leftCollection, rightCollection);
-
-    expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
-    expectedResult.add(KV.of("Key2", KV.of(4L, "gazonk")));
-    PAssert.that(output).containsInAnyOrder(expectedResult);
-
-    p.run();
-  }
-
-  @Test
-  public void testJoinManyToOneMapping() {
-    leftListOfKv.add(KV.of("Key2", 4L));
-    leftListOfKv.add(KV.of("Key2", 6L));
-    PCollection<KV<String, Long>> leftCollection = p
-        .apply("CreateLeft", Create.of(leftListOfKv));
-
-    listRightOfKv.add(KV.of("Key2", "bar"));
-    PCollection<KV<String, String>> rightCollection = p
-        .apply("CreateRight", Create.of(listRightOfKv));
-
-    PCollection<KV<String, KV<Long, String>>> output = Join.innerJoin(
-      leftCollection, rightCollection);
-
-    expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
-    expectedResult.add(KV.of("Key2", KV.of(6L, "bar")));
-    PAssert.that(output).containsInAnyOrder(expectedResult);
-
-    p.run();
-  }
-
-  @Test
-  public void testJoinNoneToNoneMapping() {
-    leftListOfKv.add(KV.of("Key2", 4L));
-    PCollection<KV<String, Long>> leftCollection = p
-        .apply("CreateLeft", Create.of(leftListOfKv));
-
-    listRightOfKv.add(KV.of("Key3", "bar"));
-    PCollection<KV<String, String>> rightCollection = p
-        .apply("CreateRight", Create.of(listRightOfKv));
-
-    PCollection<KV<String, KV<Long, String>>> output = Join.innerJoin(
-      leftCollection, rightCollection);
-
-    PAssert.that(output).containsInAnyOrder(expectedResult);
-    p.run();
-  }
-
-  @Test(expected = NullPointerException.class)
-  public void testJoinLeftCollectionNull() {
-    Join.innerJoin(null, p.apply(Create.of(listRightOfKv)));
-  }
-
-  @Test(expected = NullPointerException.class)
-  public void testJoinRightCollectionNull() {
-    Join.innerJoin(p.apply(Create.of(leftListOfKv)), null);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e2ca8890/contrib/join-library/src/test/java/org/apache/contrib/joinlibrary/OuterLeftJoinTest.java
----------------------------------------------------------------------
diff --git a/contrib/join-library/src/test/java/org/apache/contrib/joinlibrary/OuterLeftJoinTest.java b/contrib/join-library/src/test/java/org/apache/contrib/joinlibrary/OuterLeftJoinTest.java
deleted file mode 100644
index ca09136..0000000
--- a/contrib/join-library/src/test/java/org/apache/contrib/joinlibrary/OuterLeftJoinTest.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.contrib.joinlibrary;
-
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.List;
-
-
-/**
- * This test Outer Left Join functionality.
- */
-public class OuterLeftJoinTest {
-
-  Pipeline p;
-  List<KV<String, Long>> leftListOfKv;
-  List<KV<String, String>> listRightOfKv;
-  List<KV<String, KV<Long, String>>> expectedResult;
-
-  @Before
-  public void setup() {
-
-    p = TestPipeline.create();
-    leftListOfKv = new ArrayList<>();
-    listRightOfKv = new ArrayList<>();
-
-    expectedResult = new ArrayList<>();
-  }
-
-  @Test
-  public void testJoinOneToOneMapping() {
-    leftListOfKv.add(KV.of("Key1", 5L));
-    leftListOfKv.add(KV.of("Key2", 4L));
-    PCollection<KV<String, Long>> leftCollection = p
-        .apply("CreateLeft", Create.of(leftListOfKv));
-
-    listRightOfKv.add(KV.of("Key1", "foo"));
-    listRightOfKv.add(KV.of("Key2", "bar"));
-    PCollection<KV<String, String>> rightCollection = p
-        .apply("CreateRight", Create.of(listRightOfKv));
-
-    PCollection<KV<String, KV<Long, String>>> output = Join.leftOuterJoin(
-      leftCollection, rightCollection, "");
-
-    expectedResult.add(KV.of("Key1", KV.of(5L, "foo")));
-    expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
-    PAssert.that(output).containsInAnyOrder(expectedResult);
-
-    p.run();
-  }
-
-  @Test
-  public void testJoinOneToManyMapping() {
-    leftListOfKv.add(KV.of("Key2", 4L));
-    PCollection<KV<String, Long>> leftCollection = p
-        .apply("CreateLeft", Create.of(leftListOfKv));
-
-    listRightOfKv.add(KV.of("Key2", "bar"));
-    listRightOfKv.add(KV.of("Key2", "gazonk"));
-    PCollection<KV<String, String>> rightCollection = p
-        .apply("CreateRight", Create.of(listRightOfKv));
-
-    PCollection<KV<String, KV<Long, String>>> output = Join.leftOuterJoin(
-      leftCollection, rightCollection, "");
-
-    expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
-    expectedResult.add(KV.of("Key2", KV.of(4L, "gazonk")));
-    PAssert.that(output).containsInAnyOrder(expectedResult);
-
-    p.run();
-  }
-
-  @Test
-  public void testJoinManyToOneMapping() {
-    leftListOfKv.add(KV.of("Key2", 4L));
-    leftListOfKv.add(KV.of("Key2", 6L));
-    PCollection<KV<String, Long>> leftCollection = p
-        .apply("CreateLeft", Create.of(leftListOfKv));
-
-    listRightOfKv.add(KV.of("Key2", "bar"));
-    PCollection<KV<String, String>> rightCollection = p
-        .apply("CreateRight", Create.of(listRightOfKv));
-
-    PCollection<KV<String, KV<Long, String>>> output = Join.leftOuterJoin(
-      leftCollection, rightCollection, "");
-
-    expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
-    expectedResult.add(KV.of("Key2", KV.of(6L, "bar")));
-    PAssert.that(output).containsInAnyOrder(expectedResult);
-
-    p.run();
-  }
-
-  @Test
-  public void testJoinOneToNoneMapping() {
-    leftListOfKv.add(KV.of("Key2", 4L));
-    PCollection<KV<String, Long>> leftCollection = p
-        .apply("CreateLeft", Create.of(leftListOfKv));
-
-    listRightOfKv.add(KV.of("Key3", "bar"));
-    PCollection<KV<String, String>> rightCollection = p
-        .apply("CreateRight", Create.of(listRightOfKv));
-
-    PCollection<KV<String, KV<Long, String>>> output = Join.leftOuterJoin(
-      leftCollection, rightCollection, "");
-
-    expectedResult.add(KV.of("Key2", KV.of(4L, "")));
-    PAssert.that(output).containsInAnyOrder(expectedResult);
-    p.run();
-  }
-
-  @Test(expected = NullPointerException.class)
-  public void testJoinLeftCollectionNull() {
-    Join.leftOuterJoin(null, p.apply(Create.of(listRightOfKv)), "");
-  }
-
-  @Test(expected = NullPointerException.class)
-  public void testJoinRightCollectionNull() {
-    Join.leftOuterJoin(p.apply(Create.of(leftListOfKv)), null, "");
-  }
-
-  @Test(expected = NullPointerException.class)
-  public void testJoinNullValueIsNull() {
-    Join.leftOuterJoin(
-        p.apply("CreateLeft", Create.of(leftListOfKv)),
-        p.apply("CreateRight", Create.of(listRightOfKv)),
-        null);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e2ca8890/contrib/join-library/src/test/java/org/apache/contrib/joinlibrary/OuterRightJoinTest.java
----------------------------------------------------------------------
diff --git a/contrib/join-library/src/test/java/org/apache/contrib/joinlibrary/OuterRightJoinTest.java b/contrib/join-library/src/test/java/org/apache/contrib/joinlibrary/OuterRightJoinTest.java
deleted file mode 100644
index 86028ac..0000000
--- a/contrib/join-library/src/test/java/org/apache/contrib/joinlibrary/OuterRightJoinTest.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.contrib.joinlibrary;
-
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.List;
-
-
-/**
- * This test Outer Right Join functionality.
- */
-public class OuterRightJoinTest {
-
-  Pipeline p;
-  List<KV<String, Long>> leftListOfKv;
-  List<KV<String, String>> listRightOfKv;
-  List<KV<String, KV<Long, String>>> expectedResult;
-
-  @Before
-  public void setup() {
-
-    p = TestPipeline.create();
-    leftListOfKv = new ArrayList<>();
-    listRightOfKv = new ArrayList<>();
-
-    expectedResult = new ArrayList<>();
-  }
-
-  @Test
-  public void testJoinOneToOneMapping() {
-    leftListOfKv.add(KV.of("Key1", 5L));
-    leftListOfKv.add(KV.of("Key2", 4L));
-    PCollection<KV<String, Long>> leftCollection = p
-        .apply("CreateLeft", Create.of(leftListOfKv));
-
-    listRightOfKv.add(KV.of("Key1", "foo"));
-    listRightOfKv.add(KV.of("Key2", "bar"));
-    PCollection<KV<String, String>> rightCollection = p
-        .apply("CreateRight", Create.of(listRightOfKv));
-
-    PCollection<KV<String, KV<Long, String>>> output = Join.rightOuterJoin(
-      leftCollection, rightCollection, -1L);
-
-    expectedResult.add(KV.of("Key1", KV.of(5L, "foo")));
-    expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
-    PAssert.that(output).containsInAnyOrder(expectedResult);
-
-    p.run();
-  }
-
-  @Test
-  public void testJoinOneToManyMapping() {
-    leftListOfKv.add(KV.of("Key2", 4L));
-    PCollection<KV<String, Long>> leftCollection = p
-        .apply("CreateLeft", Create.of(leftListOfKv));
-
-    listRightOfKv.add(KV.of("Key2", "bar"));
-    listRightOfKv.add(KV.of("Key2", "gazonk"));
-    PCollection<KV<String, String>> rightCollection = p
-        .apply("CreateRight", Create.of(listRightOfKv));
-
-    PCollection<KV<String, KV<Long, String>>> output = Join.rightOuterJoin(
-      leftCollection, rightCollection, -1L);
-
-    expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
-    expectedResult.add(KV.of("Key2", KV.of(4L, "gazonk")));
-    PAssert.that(output).containsInAnyOrder(expectedResult);
-
-    p.run();
-  }
-
-  @Test
-  public void testJoinManyToOneMapping() {
-    leftListOfKv.add(KV.of("Key2", 4L));
-    leftListOfKv.add(KV.of("Key2", 6L));
-    PCollection<KV<String, Long>> leftCollection = p
-        .apply("CreateLeft", Create.of(leftListOfKv));
-
-    listRightOfKv.add(KV.of("Key2", "bar"));
-    PCollection<KV<String, String>> rightCollection = p
-        .apply("CreateRight", Create.of(listRightOfKv));
-
-    PCollection<KV<String, KV<Long, String>>> output = Join.rightOuterJoin(
-      leftCollection, rightCollection, -1L);
-
-    expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
-    expectedResult.add(KV.of("Key2", KV.of(6L, "bar")));
-    PAssert.that(output).containsInAnyOrder(expectedResult);
-
-    p.run();
-  }
-
-  @Test
-  public void testJoinNoneToOneMapping() {
-    leftListOfKv.add(KV.of("Key2", 4L));
-    PCollection<KV<String, Long>> leftCollection = p
-        .apply("CreateLeft", Create.of(leftListOfKv));
-
-    listRightOfKv.add(KV.of("Key3", "bar"));
-    PCollection<KV<String, String>> rightCollection = p
-        .apply("CreateRight", Create.of(listRightOfKv));
-
-    PCollection<KV<String, KV<Long, String>>> output = Join.rightOuterJoin(
-      leftCollection, rightCollection, -1L);
-
-    expectedResult.add(KV.of("Key3", KV.of(-1L, "bar")));
-    PAssert.that(output).containsInAnyOrder(expectedResult);
-    p.run();
-  }
-
-  @Test(expected = NullPointerException.class)
-  public void testJoinLeftCollectionNull() {
-    Join.rightOuterJoin(null, p.apply(Create.of(listRightOfKv)), "");
-  }
-
-  @Test(expected = NullPointerException.class)
-  public void testJoinRightCollectionNull() {
-    Join.rightOuterJoin(p.apply(Create.of(leftListOfKv)), null, -1L);
-  }
-
-  @Test(expected = NullPointerException.class)
-  public void testJoinNullValueIsNull() {
-    Join.rightOuterJoin(
-        p.apply("CreateLeft", Create.of(leftListOfKv)),
-        p.apply("CreateRight", Create.of(listRightOfKv)),
-        null);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e2ca8890/sdks/java/extensions/join-library/README.md
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/join-library/README.md b/sdks/java/extensions/join-library/README.md
new file mode 100644
index 0000000..8e2a011
--- /dev/null
+++ b/sdks/java/extensions/join-library/README.md
@@ -0,0 +1,33 @@
+Join-library
+============
+
+Join-library provides inner join, outer left and right join functions to
+Google Cloud Dataflow. The aim is to simplify the most common cases of join to a
+simple function call.
+
+The functions are generic so it supports join of any types supported by
+Dataflow. Input to the join functions are PCollections of Key/Values. Both the
+left and right PCollections need the same type for the key. All the join
+functions return a Key/Value where Key is the join key and value is
+a Key/Value where the key is the left value and right is the value.
+
+In the cases of outer join, since null cannot be serialized the user have
+to provide a value that represent null for that particular use case.
+
+Example how to use join-library:
+
+    PCollection<KV<String, String>> leftPcollection = ...
+    PCollection<KV<String, Long>> rightPcollection = ...
+
+    PCollection<KV<String, KV<String, Long>>> joinedPcollection =
+      Join.innerJoin(leftPcollection, rightPcollection);
+
+Join-library can be found on maven-central:
+
+    <dependency>
+      <groupId>org.linuxalert.dataflow</groupId>
+      <artifactId>google-cloud-dataflow-java-contrib-joinlibrary</artifactId>
+      <version>0.0.3</version>
+    </dependency>
+
+Questions or comments: `M.Runesson [at] gmail [dot] com`

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e2ca8890/sdks/java/extensions/join-library/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/join-library/pom.xml b/sdks/java/extensions/join-library/pom.xml
new file mode 100644
index 0000000..2765276
--- /dev/null
+++ b/sdks/java/extensions/join-library/pom.xml
@@ -0,0 +1,111 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.beam</groupId>
+    <artifactId>extensions-parent</artifactId>
+    <version>0.1.0-incubating-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>join-library</artifactId>
+  <name>Apache Beam :: SDKs :: Java :: Extensions :: Join library</name>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <version>3.2</version>
+        <configuration>
+          <source>1.7</source>
+          <target>1.7</target>
+        </configuration>
+      </plugin>
+
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-checkstyle-plugin</artifactId>
+        <version>2.17</version>
+        <configuration>
+          <configLocation>../../checkstyle.xml</configLocation>
+          <consoleOutput>true</consoleOutput>
+          <failOnViolation>true</failOnViolation>
+          <includeTestSourceDirectory>true</includeTestSourceDirectory>
+        </configuration>
+        <executions>
+          <execution>
+            <phase>validate</phase>
+            <goals>
+              <goal>check</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-source-plugin</artifactId>
+        <version>2.4</version>
+        <executions>
+          <execution>
+            <id>attach-sources</id>
+            <goals>
+              <goal>jar</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>java-sdk-all</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+      <version>19.0</version>
+    </dependency>
+
+    <!-- Dependency for tests -->
+    <dependency>
+      <groupId>org.hamcrest</groupId>
+      <artifactId>hamcrest-all</artifactId>
+      <version>1.3</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <version>4.12</version>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e2ca8890/sdks/java/extensions/join-library/src/main/java/org/apache/contrib/joinlibrary/Join.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/join-library/src/main/java/org/apache/contrib/joinlibrary/Join.java b/sdks/java/extensions/join-library/src/main/java/org/apache/contrib/joinlibrary/Join.java
new file mode 100644
index 0000000..6421e97
--- /dev/null
+++ b/sdks/java/extensions/join-library/src/main/java/org/apache/contrib/joinlibrary/Join.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.contrib.joinlibrary;
+
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.join.CoGbkResult;
+import org.apache.beam.sdk.transforms.join.CoGroupByKey;
+import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TupleTag;
+import com.google.common.base.Preconditions;
+
+/**
+ * Utility class with different versions of joins. All methods join two collections of
+ * key/value pairs (KV).
+ */
+public class Join {
+
+  /**
+   * Inner join of two collections of KV elements.
+   * @param leftCollection Left side collection to join.
+   * @param rightCollection Right side collection to join.
+   * @param <K> Type of the key for both collections
+   * @param <V1> Type of the values for the left collection.
+   * @param <V2> Type of the values for the right collection.
+   * @return A joined collection of KV where Key is the key and value is a
+   *         KV where Key is of type V1 and Value is type V2.
+   */
+  public static <K, V1, V2> PCollection<KV<K, KV<V1, V2>>> innerJoin(
+    final PCollection<KV<K, V1>> leftCollection, final PCollection<KV<K, V2>> rightCollection) {
+    Preconditions.checkNotNull(leftCollection);
+    Preconditions.checkNotNull(rightCollection);
+
+    final TupleTag<V1> v1Tuple = new TupleTag<>();
+    final TupleTag<V2> v2Tuple = new TupleTag<>();
+
+    PCollection<KV<K, CoGbkResult>> coGbkResultCollection =
+      KeyedPCollectionTuple.of(v1Tuple, leftCollection)
+        .and(v2Tuple, rightCollection)
+        .apply(CoGroupByKey.<K>create());
+
+    return coGbkResultCollection.apply(ParDo.of(
+      new DoFn<KV<K, CoGbkResult>, KV<K, KV<V1, V2>>>() {
+        @Override
+        public void processElement(ProcessContext c) {
+          KV<K, CoGbkResult> e = c.element();
+
+          Iterable<V1> leftValuesIterable = e.getValue().getAll(v1Tuple);
+          Iterable<V2> rightValuesIterable = e.getValue().getAll(v2Tuple);
+
+          for (V1 leftValue : leftValuesIterable) {
+            for (V2 rightValue : rightValuesIterable) {
+              c.output(KV.of(e.getKey(), KV.of(leftValue, rightValue)));
+            }
+          }
+        }
+      }))
+      .setCoder(KvCoder.of(((KvCoder) leftCollection.getCoder()).getKeyCoder(),
+                           KvCoder.of(((KvCoder) leftCollection.getCoder()).getValueCoder(),
+                                      ((KvCoder) rightCollection.getCoder()).getValueCoder())));
+  }
+
+  /**
+   * Left Outer Join of two collections of KV elements.
+   * @param leftCollection Left side collection to join.
+   * @param rightCollection Right side collection to join.
+   * @param nullValue Value to use as null value when right side do not match left side.
+   * @param <K> Type of the key for both collections
+   * @param <V1> Type of the values for the left collection.
+   * @param <V2> Type of the values for the right collection.
+   * @return A joined collection of KV where Key is the key and value is a
+   *         KV where Key is of type V1 and Value is type V2. Values that
+   *         should be null or empty is replaced with nullValue.
+   */
+  public static <K, V1, V2> PCollection<KV<K, KV<V1, V2>>> leftOuterJoin(
+    final PCollection<KV<K, V1>> leftCollection,
+    final PCollection<KV<K, V2>> rightCollection,
+    final V2 nullValue) {
+    Preconditions.checkNotNull(leftCollection);
+    Preconditions.checkNotNull(rightCollection);
+    Preconditions.checkNotNull(nullValue);
+
+    final TupleTag<V1> v1Tuple = new TupleTag<>();
+    final TupleTag<V2> v2Tuple = new TupleTag<>();
+
+    PCollection<KV<K, CoGbkResult>> coGbkResultCollection =
+      KeyedPCollectionTuple.of(v1Tuple, leftCollection)
+        .and(v2Tuple, rightCollection)
+        .apply(CoGroupByKey.<K>create());
+
+    return coGbkResultCollection.apply(ParDo.of(
+      new DoFn<KV<K, CoGbkResult>, KV<K, KV<V1, V2>>>() {
+        @Override
+        public void processElement(ProcessContext c) {
+          KV<K, CoGbkResult> e = c.element();
+
+          Iterable<V1> leftValuesIterable = e.getValue().getAll(v1Tuple);
+          Iterable<V2> rightValuesIterable = e.getValue().getAll(v2Tuple);
+
+          for (V1 leftValue : leftValuesIterable) {
+            if (rightValuesIterable.iterator().hasNext()) {
+              for (V2 rightValue : rightValuesIterable) {
+                c.output(KV.of(e.getKey(), KV.of(leftValue, rightValue)));
+              }
+            } else {
+              c.output(KV.of(e.getKey(), KV.of(leftValue, nullValue)));
+            }
+          }
+        }
+      }))
+      .setCoder(KvCoder.of(((KvCoder) leftCollection.getCoder()).getKeyCoder(),
+                           KvCoder.of(((KvCoder) leftCollection.getCoder()).getValueCoder(),
+                                      ((KvCoder) rightCollection.getCoder()).getValueCoder())));
+  }
+
+  /**
+   * Right Outer Join of two collections of KV elements.
+   * @param leftCollection Left side collection to join.
+   * @param rightCollection Right side collection to join.
+   * @param nullValue Value to use as null value when left side do not match right side.
+   * @param <K> Type of the key for both collections
+   * @param <V1> Type of the values for the left collection.
+   * @param <V2> Type of the values for the right collection.
+   * @return A joined collection of KV where Key is the key and value is a
+   *         KV where Key is of type V1 and Value is type V2. Keys that
+   *         should be null or empty is replaced with nullValue.
+   */
+  public static <K, V1, V2> PCollection<KV<K, KV<V1, V2>>> rightOuterJoin(
+    final PCollection<KV<K, V1>> leftCollection,
+    final PCollection<KV<K, V2>> rightCollection,
+    final V1 nullValue) {
+    Preconditions.checkNotNull(leftCollection);
+    Preconditions.checkNotNull(rightCollection);
+    Preconditions.checkNotNull(nullValue);
+
+    final TupleTag<V1> v1Tuple = new TupleTag<>();
+    final TupleTag<V2> v2Tuple = new TupleTag<>();
+
+    PCollection<KV<K, CoGbkResult>> coGbkResultCollection =
+      KeyedPCollectionTuple.of(v1Tuple, leftCollection)
+        .and(v2Tuple, rightCollection)
+        .apply(CoGroupByKey.<K>create());
+
+    return coGbkResultCollection.apply(ParDo.of(
+      new DoFn<KV<K, CoGbkResult>, KV<K, KV<V1, V2>>>() {
+        @Override
+        public void processElement(ProcessContext c) {
+          KV<K, CoGbkResult> e = c.element();
+
+          Iterable<V1> leftValuesIterable = e.getValue().getAll(v1Tuple);
+          Iterable<V2> rightValuesIterable = e.getValue().getAll(v2Tuple);
+
+          for (V2 rightValue : rightValuesIterable) {
+            if (leftValuesIterable.iterator().hasNext()) {
+              for (V1 leftValue : leftValuesIterable) {
+                c.output(KV.of(e.getKey(), KV.of(leftValue, rightValue)));
+              }
+            } else {
+              c.output(KV.of(e.getKey(), KV.of(nullValue, rightValue)));
+            }
+          }
+        }
+      }))
+      .setCoder(KvCoder.of(((KvCoder) leftCollection.getCoder()).getKeyCoder(),
+                           KvCoder.of(((KvCoder) leftCollection.getCoder()).getValueCoder(),
+                                      ((KvCoder) rightCollection.getCoder()).getValueCoder())));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e2ca8890/sdks/java/extensions/join-library/src/test/java/org/apache/contrib/joinlibrary/InnerJoinTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/join-library/src/test/java/org/apache/contrib/joinlibrary/InnerJoinTest.java b/sdks/java/extensions/join-library/src/test/java/org/apache/contrib/joinlibrary/InnerJoinTest.java
new file mode 100644
index 0000000..99e9c4b
--- /dev/null
+++ b/sdks/java/extensions/join-library/src/test/java/org/apache/contrib/joinlibrary/InnerJoinTest.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.contrib.joinlibrary;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * This test Inner Join functionality.
+ */
+public class InnerJoinTest {
+
+  Pipeline p;
+  List<KV<String, Long>> leftListOfKv;
+  List<KV<String, String>> listRightOfKv;
+  List<KV<String, KV<Long, String>>> expectedResult;
+
+  @Before
+  public void setup() {
+
+    p = TestPipeline.create();
+    leftListOfKv = new ArrayList<>();
+    listRightOfKv = new ArrayList<>();
+
+    expectedResult = new ArrayList<>();
+  }
+
+  @Test
+  public void testJoinOneToOneMapping() {
+    leftListOfKv.add(KV.of("Key1", 5L));
+    leftListOfKv.add(KV.of("Key2", 4L));
+    PCollection<KV<String, Long>> leftCollection =
+        p.apply("CreateLeft", Create.of(leftListOfKv));
+
+    listRightOfKv.add(KV.of("Key1", "foo"));
+    listRightOfKv.add(KV.of("Key2", "bar"));
+    PCollection<KV<String, String>> rightCollection =
+        p.apply("CreateRight", Create.of(listRightOfKv));
+
+    PCollection<KV<String, KV<Long, String>>> output = Join.innerJoin(
+      leftCollection, rightCollection);
+
+    expectedResult.add(KV.of("Key1", KV.of(5L, "foo")));
+    expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
+    PAssert.that(output).containsInAnyOrder(expectedResult);
+
+    p.run();
+  }
+
+  @Test
+  public void testJoinOneToManyMapping() {
+    leftListOfKv.add(KV.of("Key2", 4L));
+    PCollection<KV<String, Long>> leftCollection = p
+        .apply("CreateLeft", Create.of(leftListOfKv));
+
+    listRightOfKv.add(KV.of("Key2", "bar"));
+    listRightOfKv.add(KV.of("Key2", "gazonk"));
+    PCollection<KV<String, String>> rightCollection = p
+        .apply("CreateRight", Create.of(listRightOfKv));
+
+    PCollection<KV<String, KV<Long, String>>> output = Join.innerJoin(
+      leftCollection, rightCollection);
+
+    expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
+    expectedResult.add(KV.of("Key2", KV.of(4L, "gazonk")));
+    PAssert.that(output).containsInAnyOrder(expectedResult);
+
+    p.run();
+  }
+
+  @Test
+  public void testJoinManyToOneMapping() {
+    leftListOfKv.add(KV.of("Key2", 4L));
+    leftListOfKv.add(KV.of("Key2", 6L));
+    PCollection<KV<String, Long>> leftCollection = p
+        .apply("CreateLeft", Create.of(leftListOfKv));
+
+    listRightOfKv.add(KV.of("Key2", "bar"));
+    PCollection<KV<String, String>> rightCollection = p
+        .apply("CreateRight", Create.of(listRightOfKv));
+
+    PCollection<KV<String, KV<Long, String>>> output = Join.innerJoin(
+      leftCollection, rightCollection);
+
+    expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
+    expectedResult.add(KV.of("Key2", KV.of(6L, "bar")));
+    PAssert.that(output).containsInAnyOrder(expectedResult);
+
+    p.run();
+  }
+
+  @Test
+  public void testJoinNoneToNoneMapping() {
+    leftListOfKv.add(KV.of("Key2", 4L));
+    PCollection<KV<String, Long>> leftCollection = p
+        .apply("CreateLeft", Create.of(leftListOfKv));
+
+    listRightOfKv.add(KV.of("Key3", "bar"));
+    PCollection<KV<String, String>> rightCollection = p
+        .apply("CreateRight", Create.of(listRightOfKv));
+
+    PCollection<KV<String, KV<Long, String>>> output = Join.innerJoin(
+      leftCollection, rightCollection);
+
+    PAssert.that(output).containsInAnyOrder(expectedResult);
+    p.run();
+  }
+
+  @Test(expected = NullPointerException.class)
+  public void testJoinLeftCollectionNull() {
+    Join.innerJoin(null, p.apply(Create.of(listRightOfKv)));
+  }
+
+  @Test(expected = NullPointerException.class)
+  public void testJoinRightCollectionNull() {
+    Join.innerJoin(p.apply(Create.of(leftListOfKv)), null);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e2ca8890/sdks/java/extensions/join-library/src/test/java/org/apache/contrib/joinlibrary/OuterLeftJoinTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/join-library/src/test/java/org/apache/contrib/joinlibrary/OuterLeftJoinTest.java b/sdks/java/extensions/join-library/src/test/java/org/apache/contrib/joinlibrary/OuterLeftJoinTest.java
new file mode 100644
index 0000000..ca09136
--- /dev/null
+++ b/sdks/java/extensions/join-library/src/test/java/org/apache/contrib/joinlibrary/OuterLeftJoinTest.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.contrib.joinlibrary;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+
+/**
+ * This test Outer Left Join functionality.
+ */
+public class OuterLeftJoinTest {
+
+  Pipeline p;
+  List<KV<String, Long>> leftListOfKv;
+  List<KV<String, String>> listRightOfKv;
+  List<KV<String, KV<Long, String>>> expectedResult;
+
+  @Before
+  public void setup() {
+
+    p = TestPipeline.create();
+    leftListOfKv = new ArrayList<>();
+    listRightOfKv = new ArrayList<>();
+
+    expectedResult = new ArrayList<>();
+  }
+
+  @Test
+  public void testJoinOneToOneMapping() {
+    leftListOfKv.add(KV.of("Key1", 5L));
+    leftListOfKv.add(KV.of("Key2", 4L));
+    PCollection<KV<String, Long>> leftCollection = p
+        .apply("CreateLeft", Create.of(leftListOfKv));
+
+    listRightOfKv.add(KV.of("Key1", "foo"));
+    listRightOfKv.add(KV.of("Key2", "bar"));
+    PCollection<KV<String, String>> rightCollection = p
+        .apply("CreateRight", Create.of(listRightOfKv));
+
+    PCollection<KV<String, KV<Long, String>>> output = Join.leftOuterJoin(
+      leftCollection, rightCollection, "");
+
+    expectedResult.add(KV.of("Key1", KV.of(5L, "foo")));
+    expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
+    PAssert.that(output).containsInAnyOrder(expectedResult);
+
+    p.run();
+  }
+
+  @Test
+  public void testJoinOneToManyMapping() {
+    leftListOfKv.add(KV.of("Key2", 4L));
+    PCollection<KV<String, Long>> leftCollection = p
+        .apply("CreateLeft", Create.of(leftListOfKv));
+
+    listRightOfKv.add(KV.of("Key2", "bar"));
+    listRightOfKv.add(KV.of("Key2", "gazonk"));
+    PCollection<KV<String, String>> rightCollection = p
+        .apply("CreateRight", Create.of(listRightOfKv));
+
+    PCollection<KV<String, KV<Long, String>>> output = Join.leftOuterJoin(
+      leftCollection, rightCollection, "");
+
+    expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
+    expectedResult.add(KV.of("Key2", KV.of(4L, "gazonk")));
+    PAssert.that(output).containsInAnyOrder(expectedResult);
+
+    p.run();
+  }
+
+  @Test
+  public void testJoinManyToOneMapping() {
+    leftListOfKv.add(KV.of("Key2", 4L));
+    leftListOfKv.add(KV.of("Key2", 6L));
+    PCollection<KV<String, Long>> leftCollection = p
+        .apply("CreateLeft", Create.of(leftListOfKv));
+
+    listRightOfKv.add(KV.of("Key2", "bar"));
+    PCollection<KV<String, String>> rightCollection = p
+        .apply("CreateRight", Create.of(listRightOfKv));
+
+    PCollection<KV<String, KV<Long, String>>> output = Join.leftOuterJoin(
+      leftCollection, rightCollection, "");
+
+    expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
+    expectedResult.add(KV.of("Key2", KV.of(6L, "bar")));
+    PAssert.that(output).containsInAnyOrder(expectedResult);
+
+    p.run();
+  }
+
+  @Test
+  public void testJoinOneToNoneMapping() {
+    leftListOfKv.add(KV.of("Key2", 4L));
+    PCollection<KV<String, Long>> leftCollection = p
+        .apply("CreateLeft", Create.of(leftListOfKv));
+
+    listRightOfKv.add(KV.of("Key3", "bar"));
+    PCollection<KV<String, String>> rightCollection = p
+        .apply("CreateRight", Create.of(listRightOfKv));
+
+    PCollection<KV<String, KV<Long, String>>> output = Join.leftOuterJoin(
+      leftCollection, rightCollection, "");
+
+    expectedResult.add(KV.of("Key2", KV.of(4L, "")));
+    PAssert.that(output).containsInAnyOrder(expectedResult);
+    p.run();
+  }
+
+  @Test(expected = NullPointerException.class)
+  public void testJoinLeftCollectionNull() {
+    Join.leftOuterJoin(null, p.apply(Create.of(listRightOfKv)), "");
+  }
+
+  @Test(expected = NullPointerException.class)
+  public void testJoinRightCollectionNull() {
+    Join.leftOuterJoin(p.apply(Create.of(leftListOfKv)), null, "");
+  }
+
+  @Test(expected = NullPointerException.class)
+  public void testJoinNullValueIsNull() {
+    Join.leftOuterJoin(
+        p.apply("CreateLeft", Create.of(leftListOfKv)),
+        p.apply("CreateRight", Create.of(listRightOfKv)),
+        null);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e2ca8890/sdks/java/extensions/join-library/src/test/java/org/apache/contrib/joinlibrary/OuterRightJoinTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/join-library/src/test/java/org/apache/contrib/joinlibrary/OuterRightJoinTest.java b/sdks/java/extensions/join-library/src/test/java/org/apache/contrib/joinlibrary/OuterRightJoinTest.java
new file mode 100644
index 0000000..86028ac
--- /dev/null
+++ b/sdks/java/extensions/join-library/src/test/java/org/apache/contrib/joinlibrary/OuterRightJoinTest.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.contrib.joinlibrary;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+
+/**
+ * This test Outer Right Join functionality.
+ */
+public class OuterRightJoinTest {
+
+  Pipeline p;
+  List<KV<String, Long>> leftListOfKv;
+  List<KV<String, String>> listRightOfKv;
+  List<KV<String, KV<Long, String>>> expectedResult;
+
+  @Before
+  public void setup() {
+
+    p = TestPipeline.create();
+    leftListOfKv = new ArrayList<>();
+    listRightOfKv = new ArrayList<>();
+
+    expectedResult = new ArrayList<>();
+  }
+
+  @Test
+  public void testJoinOneToOneMapping() {
+    leftListOfKv.add(KV.of("Key1", 5L));
+    leftListOfKv.add(KV.of("Key2", 4L));
+    PCollection<KV<String, Long>> leftCollection = p
+        .apply("CreateLeft", Create.of(leftListOfKv));
+
+    listRightOfKv.add(KV.of("Key1", "foo"));
+    listRightOfKv.add(KV.of("Key2", "bar"));
+    PCollection<KV<String, String>> rightCollection = p
+        .apply("CreateRight", Create.of(listRightOfKv));
+
+    PCollection<KV<String, KV<Long, String>>> output = Join.rightOuterJoin(
+      leftCollection, rightCollection, -1L);
+
+    expectedResult.add(KV.of("Key1", KV.of(5L, "foo")));
+    expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
+    PAssert.that(output).containsInAnyOrder(expectedResult);
+
+    p.run();
+  }
+
+  @Test
+  public void testJoinOneToManyMapping() {
+    leftListOfKv.add(KV.of("Key2", 4L));
+    PCollection<KV<String, Long>> leftCollection = p
+        .apply("CreateLeft", Create.of(leftListOfKv));
+
+    listRightOfKv.add(KV.of("Key2", "bar"));
+    listRightOfKv.add(KV.of("Key2", "gazonk"));
+    PCollection<KV<String, String>> rightCollection = p
+        .apply("CreateRight", Create.of(listRightOfKv));
+
+    PCollection<KV<String, KV<Long, String>>> output = Join.rightOuterJoin(
+      leftCollection, rightCollection, -1L);
+
+    expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
+    expectedResult.add(KV.of("Key2", KV.of(4L, "gazonk")));
+    PAssert.that(output).containsInAnyOrder(expectedResult);
+
+    p.run();
+  }
+
+  @Test
+  public void testJoinManyToOneMapping() {
+    leftListOfKv.add(KV.of("Key2", 4L));
+    leftListOfKv.add(KV.of("Key2", 6L));
+    PCollection<KV<String, Long>> leftCollection = p
+        .apply("CreateLeft", Create.of(leftListOfKv));
+
+    listRightOfKv.add(KV.of("Key2", "bar"));
+    PCollection<KV<String, String>> rightCollection = p
+        .apply("CreateRight", Create.of(listRightOfKv));
+
+    PCollection<KV<String, KV<Long, String>>> output = Join.rightOuterJoin(
+      leftCollection, rightCollection, -1L);
+
+    expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
+    expectedResult.add(KV.of("Key2", KV.of(6L, "bar")));
+    PAssert.that(output).containsInAnyOrder(expectedResult);
+
+    p.run();
+  }
+
+  @Test
+  public void testJoinNoneToOneMapping() {
+    leftListOfKv.add(KV.of("Key2", 4L));
+    PCollection<KV<String, Long>> leftCollection = p
+        .apply("CreateLeft", Create.of(leftListOfKv));
+
+    listRightOfKv.add(KV.of("Key3", "bar"));
+    PCollection<KV<String, String>> rightCollection = p
+        .apply("CreateRight", Create.of(listRightOfKv));
+
+    PCollection<KV<String, KV<Long, String>>> output = Join.rightOuterJoin(
+      leftCollection, rightCollection, -1L);
+
+    expectedResult.add(KV.of("Key3", KV.of(-1L, "bar")));
+    PAssert.that(output).containsInAnyOrder(expectedResult);
+    p.run();
+  }
+
+  @Test(expected = NullPointerException.class)
+  public void testJoinLeftCollectionNull() {
+    Join.rightOuterJoin(null, p.apply(Create.of(listRightOfKv)), "");
+  }
+
+  @Test(expected = NullPointerException.class)
+  public void testJoinRightCollectionNull() {
+    Join.rightOuterJoin(p.apply(Create.of(leftListOfKv)), null, -1L);
+  }
+
+  @Test(expected = NullPointerException.class)
+  public void testJoinNullValueIsNull() {
+    Join.rightOuterJoin(
+        p.apply("CreateLeft", Create.of(leftListOfKv)),
+        p.apply("CreateRight", Create.of(listRightOfKv)),
+        null);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e2ca8890/sdks/java/extensions/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/pom.xml b/sdks/java/extensions/pom.xml
new file mode 100644
index 0000000..180b8b7
--- /dev/null
+++ b/sdks/java/extensions/pom.xml
@@ -0,0 +1,40 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.beam</groupId>
+    <artifactId>java-sdk-parent</artifactId>
+    <version>0.1.0-incubating-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>extensions-parent</artifactId>
+  <packaging>pom</packaging>
+
+  <name>Apache Beam :: SDKs :: Java :: Extensions</name>
+
+  <modules>
+    <module>join-library</module>
+  </modules>
+
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e2ca8890/sdks/java/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/pom.xml b/sdks/java/pom.xml
index 6bd7ee7..e5da2d5 100644
--- a/sdks/java/pom.xml
+++ b/sdks/java/pom.xml
@@ -41,6 +41,7 @@
          DataflowPipelineRunner. Until these are refactored out or
          a released artifact exists, we need to modify the build order.
     <module>maven-archetypes</module> -->
+    <module>extensions</module>
   </modules>
 
   <profiles>