You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2013/09/02 18:25:13 UTC

git commit: CRUNCH-258: Support multiple output channels from a DoFn. Contributed by Brandon Inman.

Updated Branches:
  refs/heads/master b7781ca08 -> 92888ff42


CRUNCH-258: Support multiple output channels from a DoFn. Contributed by Brandon Inman.


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/92888ff4
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/92888ff4
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/92888ff4

Branch: refs/heads/master
Commit: 92888ff4260d1417baba66323e2dd2779f0105e4
Parents: b7781ca
Author: Josh Wills <jw...@apache.org>
Authored: Mon Sep 2 09:22:32 2013 -0700
Committer: Josh Wills <jw...@apache.org>
Committed: Mon Sep 2 09:22:32 2013 -0700

----------------------------------------------------------------------
 .../java/org/apache/crunch/lib/Channels.java    | 102 +++++++++++++++++++
 .../org/apache/crunch/lib/ChannelsTest.java     |  59 +++++++++++
 2 files changed, 161 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/92888ff4/crunch-core/src/main/java/org/apache/crunch/lib/Channels.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/Channels.java b/crunch-core/src/main/java/org/apache/crunch/lib/Channels.java
new file mode 100644
index 0000000..568ca20
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/lib/Channels.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib;
+
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.Pair;
+import org.apache.crunch.lib.Channels.FirstEmittingDoFn.SecondEmittingDoFn;
+import org.apache.crunch.types.PType;
+
+/**
+ * Utilities for splitting {@link Pair} instances emitted by {@link DoFn} into
+ * separate {@link PCollection} instances. A typical motivation for this might
+ * be to separate standard output from error output of a DoFn.
+ * 
+ * @author Brandon Inman
+ * 
+ */
+public class Channels {
+
+  /**
+   * Splits a {@link PCollection} of any {@link Pair} of objects into a Pair of
+   * PCollection}, to allow for the output of a DoFn to be handled using
+   * separate channels.
+   *
+   * @param pCollection The {@code PCollection} to split
+  */
+  public static <T, U> Pair<PCollection<T>, PCollection<U>> split(PCollection<Pair<T, U>> pCollection) {
+    PType<Pair<T, U>> pt = pCollection.getPType();
+    return split(pCollection, pt.getSubTypes().get(0), pt.getSubTypes().get(1));
+  }
+
+  /**
+   * Splits a {@link PCollection} of any {@link Pair} of objects into a Pair of
+   * PCollection}, to allow for the output of a DoFn to be handled using
+   * separate channels.
+   * 
+   * @param pCollection The {@code PCollection} to split
+   * @param firstPType The {@code PType} for the first collection
+   * @param secondPType The {@code PType} for the second collection
+   * @return {@link Pair} of {@link PCollection}
+   */
+  public static <T, U> Pair<PCollection<T>, PCollection<U>> split(PCollection<Pair<T, U>> pCollection,
+      PType<T> firstPType, PType<U> secondPType) {
+
+    PCollection<T> first = pCollection.parallelDo(new FirstEmittingDoFn<T, U>(), firstPType);
+    PCollection<U> second = pCollection.parallelDo(new SecondEmittingDoFn<T, U>(), secondPType);
+    return Pair.of(first, second);
+  }
+
+  /**
+   * DoFn that emits non-null first values in a {@link Pair}.
+   *
+   * @author Brandon Inman
+   * @param <T>
+   * @param <U>
+   */
+   static class FirstEmittingDoFn<T, U> extends DoFn<Pair<T, U>, T> {
+
+    @Override
+    public void process(Pair<T, U> input, Emitter<T> emitter) {
+      T first = input.first();
+      if (first != null) {
+        emitter.emit(first);
+      }
+    }
+
+    /**
+     * DoFn that emits non-null second values in a {@link Pair}.
+     * 
+     * @author Brandon Inman
+     * @param <T>
+     * @param <U>
+     */
+    static class SecondEmittingDoFn<T, U> extends DoFn<Pair<T, U>, U> {
+
+      @Override
+      public void process(Pair<T, U> input, Emitter<U> emitter) {
+        U second = input.second();
+        if (second != null) {
+          emitter.emit(second);
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/92888ff4/crunch-core/src/test/java/org/apache/crunch/lib/ChannelsTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/lib/ChannelsTest.java b/crunch-core/src/test/java/org/apache/crunch/lib/ChannelsTest.java
new file mode 100644
index 0000000..f278278
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/lib/ChannelsTest.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Collection;
+
+import org.apache.crunch.PCollection;
+import org.apache.crunch.Pair;
+import org.apache.crunch.impl.mem.MemPipeline;
+import org.apache.crunch.types.writable.Writables;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableList;
+
+public class ChannelsTest {
+
+  /**
+   * Test that non-null values in a PCollection<Pair<>> are split properly into
+   * a Pair<PCollection<>,PCollection<>>
+   */
+  @Test
+  public void split() {
+    // Test that any combination of values and nulls are handled properly
+    final PCollection<Pair<String, String>> pCollection = MemPipeline.typedCollectionOf(
+        Writables.pairs(Writables.strings(), Writables.strings()),
+        ImmutableList.of(Pair.of("One", (String) null), Pair.of((String) null, "Two"), Pair.of("Three", "Four"),
+            Pair.of((String) null, (String) null)));
+
+    final Pair<PCollection<String>, PCollection<String>> splitPCollection = Channels.split(pCollection);
+
+    final Collection<String> firstCollection = splitPCollection.first().asCollection().getValue();
+    assertEquals(2, firstCollection.size());
+    assertTrue(firstCollection.contains("One"));
+    assertTrue(firstCollection.contains("Three"));
+
+    final Collection<String> secondCollection = splitPCollection.second().asCollection().getValue();
+    assertEquals(2, secondCollection.size());
+    assertTrue(secondCollection.contains("Two"));
+    assertTrue(secondCollection.contains("Four"));
+  }
+}