You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2013/09/02 18:25:13 UTC
git commit: CRUNCH-258: Support multiple output channels from a DoFn.
Contributed by Brandon Inman.
Updated Branches:
refs/heads/master b7781ca08 -> 92888ff42
CRUNCH-258: Support multiple output channels from a DoFn. Contributed by Brandon Inman.
Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/92888ff4
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/92888ff4
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/92888ff4
Branch: refs/heads/master
Commit: 92888ff4260d1417baba66323e2dd2779f0105e4
Parents: b7781ca
Author: Josh Wills <jw...@apache.org>
Authored: Mon Sep 2 09:22:32 2013 -0700
Committer: Josh Wills <jw...@apache.org>
Committed: Mon Sep 2 09:22:32 2013 -0700
----------------------------------------------------------------------
.../java/org/apache/crunch/lib/Channels.java | 102 +++++++++++++++++++
.../org/apache/crunch/lib/ChannelsTest.java | 59 +++++++++++
2 files changed, 161 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/crunch/blob/92888ff4/crunch-core/src/main/java/org/apache/crunch/lib/Channels.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/Channels.java b/crunch-core/src/main/java/org/apache/crunch/lib/Channels.java
new file mode 100644
index 0000000..568ca20
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/lib/Channels.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib;
+
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.Pair;
+import org.apache.crunch.lib.Channels.FirstEmittingDoFn.SecondEmittingDoFn;
+import org.apache.crunch.types.PType;
+
+/**
+ * Utilities for splitting {@link Pair} instances emitted by {@link DoFn} into
+ * separate {@link PCollection} instances. A typical motivation for this might
+ * be to separate standard output from error output of a DoFn.
+ *
+ * @author Brandon Inman
+ *
+ */
+public class Channels {
+
+ /**
+ * Splits a {@link PCollection} of any {@link Pair} of objects into a Pair of
+ * PCollection}, to allow for the output of a DoFn to be handled using
+ * separate channels.
+ *
+ * @param pCollection The {@code PCollection} to split
+ */
+ public static <T, U> Pair<PCollection<T>, PCollection<U>> split(PCollection<Pair<T, U>> pCollection) {
+ PType<Pair<T, U>> pt = pCollection.getPType();
+ return split(pCollection, pt.getSubTypes().get(0), pt.getSubTypes().get(1));
+ }
+
+ /**
+ * Splits a {@link PCollection} of any {@link Pair} of objects into a Pair of
+ * PCollection}, to allow for the output of a DoFn to be handled using
+ * separate channels.
+ *
+ * @param pCollection The {@code PCollection} to split
+ * @param firstPType The {@code PType} for the first collection
+ * @param secondPType The {@code PType} for the second collection
+ * @return {@link Pair} of {@link PCollection}
+ */
+ public static <T, U> Pair<PCollection<T>, PCollection<U>> split(PCollection<Pair<T, U>> pCollection,
+ PType<T> firstPType, PType<U> secondPType) {
+
+ PCollection<T> first = pCollection.parallelDo(new FirstEmittingDoFn<T, U>(), firstPType);
+ PCollection<U> second = pCollection.parallelDo(new SecondEmittingDoFn<T, U>(), secondPType);
+ return Pair.of(first, second);
+ }
+
+ /**
+ * DoFn that emits non-null first values in a {@link Pair}.
+ *
+ * @author Brandon Inman
+ * @param <T>
+ * @param <U>
+ */
+ static class FirstEmittingDoFn<T, U> extends DoFn<Pair<T, U>, T> {
+
+ @Override
+ public void process(Pair<T, U> input, Emitter<T> emitter) {
+ T first = input.first();
+ if (first != null) {
+ emitter.emit(first);
+ }
+ }
+
+ /**
+ * DoFn that emits non-null second values in a {@link Pair}.
+ *
+ * @author Brandon Inman
+ * @param <T>
+ * @param <U>
+ */
+ static class SecondEmittingDoFn<T, U> extends DoFn<Pair<T, U>, U> {
+
+ @Override
+ public void process(Pair<T, U> input, Emitter<U> emitter) {
+ U second = input.second();
+ if (second != null) {
+ emitter.emit(second);
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/92888ff4/crunch-core/src/test/java/org/apache/crunch/lib/ChannelsTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/lib/ChannelsTest.java b/crunch-core/src/test/java/org/apache/crunch/lib/ChannelsTest.java
new file mode 100644
index 0000000..f278278
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/lib/ChannelsTest.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Collection;
+
+import org.apache.crunch.PCollection;
+import org.apache.crunch.Pair;
+import org.apache.crunch.impl.mem.MemPipeline;
+import org.apache.crunch.types.writable.Writables;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableList;
+
+public class ChannelsTest {
+
+ /**
+ * Test that non-null values in a PCollection<Pair<>> are split properly into
+ * a Pair<PCollection<>,PCollection<>>
+ */
+ @Test
+ public void split() {
+ // Test that any combination of values and nulls are handled properly
+ final PCollection<Pair<String, String>> pCollection = MemPipeline.typedCollectionOf(
+ Writables.pairs(Writables.strings(), Writables.strings()),
+ ImmutableList.of(Pair.of("One", (String) null), Pair.of((String) null, "Two"), Pair.of("Three", "Four"),
+ Pair.of((String) null, (String) null)));
+
+ final Pair<PCollection<String>, PCollection<String>> splitPCollection = Channels.split(pCollection);
+
+ final Collection<String> firstCollection = splitPCollection.first().asCollection().getValue();
+ assertEquals(2, firstCollection.size());
+ assertTrue(firstCollection.contains("One"));
+ assertTrue(firstCollection.contains("Three"));
+
+ final Collection<String> secondCollection = splitPCollection.second().asCollection().getValue();
+ assertEquals(2, secondCollection.size());
+ assertTrue(secondCollection.contains("Two"));
+ assertTrue(secondCollection.contains("Four"));
+ }
+}