You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2013/01/08 05:28:09 UTC
git commit: CRUNCH-128: Add an explicit dependency between the output
of one MapReduce job and the start of another one for cases like mapside
joins and total orderings,
where we need a file to exist on the filesystem before another process takes
advanta
Updated Branches:
refs/heads/master 2bf556177 -> 2bc04f98c
CRUNCH-128: Add an explicit dependency between the output of one MapReduce job
and the start of another one for cases like mapside joins and total orderings,
where we need a file to exist on the filesystem before another process takes
advantage of it.
Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/2bc04f98
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/2bc04f98
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/2bc04f98
Branch: refs/heads/master
Commit: 2bc04f98c1b874f4039ff405b5fd50098bd67447
Parents: 2bf5561
Author: Josh Wills <jw...@apache.org>
Authored: Sun Dec 9 19:25:53 2012 -0800
Committer: Josh Wills <jw...@apache.org>
Committed: Mon Jan 7 19:06:35 2013 -0800
----------------------------------------------------------------------
.../org/apache/crunch/lib/join/MapsideJoinIT.java | 56 +++++--
.../main/java/org/apache/crunch/PCollection.java | 34 ++++
.../java/org/apache/crunch/ParallelDoOptions.java | 62 ++++++++
.../org/apache/crunch/impl/mem/MemPipeline.java | 1 +
.../crunch/impl/mem/collect/MemCollection.java | 13 ++
.../crunch/impl/mr/collect/DoCollectionImpl.java | 11 ++-
.../apache/crunch/impl/mr/collect/DoTableImpl.java | 8 +-
.../crunch/impl/mr/collect/PCollectionImpl.java | 35 ++++-
.../apache/crunch/impl/mr/collect/PTableBase.java | 5 +
.../apache/crunch/impl/mr/plan/MSCRPlanner.java | 121 ++++++++++-----
.../org/apache/crunch/io/ReadableSourceTarget.java | 1 +
.../org/apache/crunch/lib/join/MapsideJoin.java | 72 ++++++----
.../crunch/materialize/MaterializableIterable.java | 9 +
13 files changed, 341 insertions(+), 87 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2bc04f98/crunch/src/it/java/org/apache/crunch/lib/join/MapsideJoinIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/lib/join/MapsideJoinIT.java b/crunch/src/it/java/org/apache/crunch/lib/join/MapsideJoinIT.java
index 9147baf..7d5d94d 100644
--- a/crunch/src/it/java/org/apache/crunch/lib/join/MapsideJoinIT.java
+++ b/crunch/src/it/java/org/apache/crunch/lib/join/MapsideJoinIT.java
@@ -29,7 +29,9 @@ import org.apache.crunch.MapFn;
import org.apache.crunch.PTable;
import org.apache.crunch.Pair;
import org.apache.crunch.Pipeline;
+import org.apache.crunch.PipelineResult;
import org.apache.crunch.fn.FilterFns;
+import org.apache.crunch.fn.MapValuesFn;
import org.apache.crunch.impl.mem.MemPipeline;
import org.apache.crunch.impl.mr.MRPipeline;
import org.apache.crunch.test.TemporaryPath;
@@ -64,21 +66,33 @@ public class MapsideJoinIT {
}
private static class LineSplitter extends MapFn<String, Pair<Integer, String>> {
-
@Override
public Pair<Integer, String> map(String input) {
String[] fields = input.split("\\|");
return Pair.of(Integer.parseInt(fields[0]), fields[1]);
}
-
}
+ private static class CapOrdersFn extends MapValuesFn<Integer, String, String> {
+ @Override
+ public String map(String v) {
+ return v.toUpperCase();
+ }
+ }
+
+ private static class ConcatValuesFn extends MapValuesFn<Integer, Pair<String, String>, String> {
+ @Override
+ public String map(Pair<String, String> v) {
+ return v.toString();
+ }
+ }
+
@Rule
public TemporaryPath tmpDir = TemporaryPaths.create();
- @Test(expected = CrunchRuntimeException.class)
- public void testNonMapReducePipeline() {
- runMapsideJoin(MemPipeline.getInstance());
+ @Test
+ public void testMapSideJoin_MemPipeline() {
+ runMapsideJoin(MemPipeline.getInstance(), true);
}
@Test
@@ -95,27 +109,39 @@ public class MapsideJoinIT {
List<Pair<Integer, Pair<String, String>>> materializedJoin = Lists.newArrayList(joined.materialize());
assertTrue(materializedJoin.isEmpty());
-
}
@Test
public void testMapsideJoin() throws IOException {
- runMapsideJoin(new MRPipeline(MapsideJoinIT.class, tmpDir.getDefaultConfiguration()));
+ runMapsideJoin(new MRPipeline(MapsideJoinIT.class, tmpDir.getDefaultConfiguration()), false);
}
- private void runMapsideJoin(Pipeline pipeline) {
+ private void runMapsideJoin(Pipeline pipeline, boolean inMemory) {
PTable<Integer, String> customerTable = readTable(pipeline, "customers.txt");
PTable<Integer, String> orderTable = readTable(pipeline, "orders.txt");
+
+ PTable<Integer, String> custOrders = MapsideJoin.join(customerTable, orderTable)
+ .parallelDo("concat", new ConcatValuesFn(), Writables.tableOf(Writables.ints(), Writables.strings()));
- PTable<Integer, Pair<String, String>> joined = MapsideJoin.join(customerTable, orderTable);
+ PTable<Integer, String> ORDER_TABLE = orderTable.parallelDo(new CapOrdersFn(), orderTable.getPTableType());
+
+ PTable<Integer, Pair<String, String>> joined = MapsideJoin.join(custOrders, ORDER_TABLE);
List<Pair<Integer, Pair<String, String>>> expectedJoinResult = Lists.newArrayList();
- expectedJoinResult.add(Pair.of(111, Pair.of("John Doe", "Corn flakes")));
- expectedJoinResult.add(Pair.of(222, Pair.of("Jane Doe", "Toilet paper")));
- expectedJoinResult.add(Pair.of(222, Pair.of("Jane Doe", "Toilet plunger")));
- expectedJoinResult.add(Pair.of(333, Pair.of("Someone Else", "Toilet brush")));
-
- List<Pair<Integer, Pair<String, String>>> joinedResultList = Lists.newArrayList(joined.materialize());
+ expectedJoinResult.add(Pair.of(111, Pair.of("[John Doe,Corn flakes]", "CORN FLAKES")));
+ expectedJoinResult.add(Pair.of(222, Pair.of("[Jane Doe,Toilet paper]", "TOILET PAPER")));
+ expectedJoinResult.add(Pair.of(222, Pair.of("[Jane Doe,Toilet paper]", "TOILET PLUNGER")));
+ expectedJoinResult.add(Pair.of(222, Pair.of("[Jane Doe,Toilet plunger]", "TOILET PAPER")));
+ expectedJoinResult.add(Pair.of(222, Pair.of("[Jane Doe,Toilet plunger]", "TOILET PLUNGER")));
+ expectedJoinResult.add(Pair.of(333, Pair.of("[Someone Else,Toilet brush]", "TOILET BRUSH")));
+ Iterable<Pair<Integer, Pair<String, String>>> iter = joined.materialize();
+
+ PipelineResult res = pipeline.run();
+ if (!inMemory) {
+ assertEquals(2, res.getStageResults().size());
+ }
+
+ List<Pair<Integer, Pair<String, String>>> joinedResultList = Lists.newArrayList(iter);
Collections.sort(joinedResultList);
assertEquals(expectedJoinResult, joinedResultList);
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2bc04f98/crunch/src/main/java/org/apache/crunch/PCollection.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/PCollection.java b/crunch/src/main/java/org/apache/crunch/PCollection.java
index 00c300f..798c262 100644
--- a/crunch/src/main/java/org/apache/crunch/PCollection.java
+++ b/crunch/src/main/java/org/apache/crunch/PCollection.java
@@ -65,6 +65,23 @@ public interface PCollection<S> {
* @return a new {@code PCollection}
*/
<T> PCollection<T> parallelDo(String name, DoFn<S, T> doFn, PType<T> type);
+
+ /**
+ * Applies the given doFn to the elements of this {@code PCollection} and
+ * returns a new {@code PCollection} that is the output of this processing.
+ *
+ * @param name
+ * An identifier for this processing step, useful for debugging
+ * @param doFn
+ * The {@code DoFn} to apply
+ * @param type
+ * The {@link PType} of the resulting {@code PCollection}
+ * @param options
+ * Optional information that is needed for certain pipeline operations
+ * @return a new {@code PCollection}
+ */
+ <T> PCollection<T> parallelDo(String name, DoFn<S, T> doFn, PType<T> type,
+ ParallelDoOptions options);
/**
* Similar to the other {@code parallelDo} instance, but returns a
@@ -91,6 +108,23 @@ public interface PCollection<S> {
* @return a new {@code PTable}
*/
<K, V> PTable<K, V> parallelDo(String name, DoFn<S, Pair<K, V>> doFn, PTableType<K, V> type);
+
+ /**
+ * Similar to the other {@code parallelDo} instance, but returns a
+ * {@code PTable} instance instead of a {@code PCollection}.
+ *
+ * @param name
+ * An identifier for this processing step
+ * @param doFn
+ * The {@code DoFn} to apply
+ * @param type
+ * The {@link PTableType} of the resulting {@code PTable}
+ * @param options
+ * Optional information that is needed for certain pipeline operations
+ * @return a new {@code PTable}
+ */
+ <K, V> PTable<K, V> parallelDo(String name, DoFn<S, Pair<K, V>> doFn, PTableType<K, V> type,
+ ParallelDoOptions options);
/**
* Write the contents of this {@code PCollection} to the given {@code Target},
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2bc04f98/crunch/src/main/java/org/apache/crunch/ParallelDoOptions.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/ParallelDoOptions.java b/crunch/src/main/java/org/apache/crunch/ParallelDoOptions.java
new file mode 100644
index 0000000..2407b3a
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/ParallelDoOptions.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import java.util.Collections;
+import java.util.Set;
+
+import com.google.common.collect.Sets;
+
+/**
+ * Container class that includes optional information about a {@code parallelDo} operation
+ * applied to a {@code PCollection}. Primarily used within the Crunch framework
+ * itself for certain types of advanced processing operations, such as in-memory joins
+ * that require reading a file from the filesystem into a {@code DoFn}.
+ */
+public class ParallelDoOptions {
+ private final Set<SourceTarget<?>> sourceTargets;
+
+ private ParallelDoOptions(Set<SourceTarget<?>> sourceTargets) {
+ this.sourceTargets = sourceTargets;
+ }
+
+ public Set<SourceTarget<?>> getSourceTargets() {
+ return sourceTargets;
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static class Builder {
+ private Set<SourceTarget<?>> sourceTargets;
+
+ public Builder() {
+ this.sourceTargets = Sets.newHashSet();
+ }
+
+ public Builder sourceTargets(SourceTarget<?>... sourceTargets) {
+ Collections.addAll(this.sourceTargets, sourceTargets);
+ return this;
+ }
+
+ public ParallelDoOptions build() {
+ return new ParallelDoOptions(sourceTargets);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2bc04f98/crunch/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java b/crunch/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
index 77c41ce..3e28a0c 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
@@ -28,6 +28,7 @@ import org.apache.crunch.Pair;
import org.apache.crunch.Pipeline;
import org.apache.crunch.PipelineResult;
import org.apache.crunch.Source;
+import org.apache.crunch.SourceTarget;
import org.apache.crunch.TableSource;
import org.apache.crunch.Target;
import org.apache.crunch.impl.mem.collect.MemCollection;
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2bc04f98/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java b/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
index 35f64ce..ffc38ae 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
@@ -31,6 +31,7 @@ import org.apache.crunch.PCollection;
import org.apache.crunch.PObject;
import org.apache.crunch.PTable;
import org.apache.crunch.Pair;
+import org.apache.crunch.ParallelDoOptions;
import org.apache.crunch.Pipeline;
import org.apache.crunch.Target;
import org.apache.crunch.fn.ExtractKeyFn;
@@ -95,6 +96,12 @@ public class MemCollection<S> implements PCollection<S> {
@Override
public <T> PCollection<T> parallelDo(String name, DoFn<S, T> doFn, PType<T> type) {
+ return parallelDo(name, doFn, type, ParallelDoOptions.builder().build());
+ }
+
+ @Override
+ public <T> PCollection<T> parallelDo(String name, DoFn<S, T> doFn, PType<T> type,
+ ParallelDoOptions options) {
InMemoryEmitter<T> emitter = new InMemoryEmitter<T>();
doFn.initialize();
for (S s : collect) {
@@ -111,6 +118,12 @@ public class MemCollection<S> implements PCollection<S> {
@Override
public <K, V> PTable<K, V> parallelDo(String name, DoFn<S, Pair<K, V>> doFn, PTableType<K, V> type) {
+ return parallelDo(name, doFn, type, ParallelDoOptions.builder().build());
+ }
+
+ @Override
+ public <K, V> PTable<K, V> parallelDo(String name, DoFn<S, Pair<K, V>> doFn, PTableType<K, V> type,
+ ParallelDoOptions options) {
InMemoryEmitter<Pair<K, V>> emitter = new InMemoryEmitter<Pair<K, V>>();
doFn.setContext(getInMemoryContext(getPipeline().getConfiguration()));
doFn.initialize();
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2bc04f98/crunch/src/main/java/org/apache/crunch/impl/mr/collect/DoCollectionImpl.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/DoCollectionImpl.java b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/DoCollectionImpl.java
index 1f4fea2..7b8f2ea 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/DoCollectionImpl.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/DoCollectionImpl.java
@@ -18,12 +18,16 @@
package org.apache.crunch.impl.mr.collect;
import java.util.List;
+import java.util.Set;
import org.apache.crunch.DoFn;
+import org.apache.crunch.ParallelDoOptions;
+import org.apache.crunch.SourceTarget;
import org.apache.crunch.impl.mr.plan.DoNode;
import org.apache.crunch.types.PType;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
public class DoCollectionImpl<S> extends PCollectionImpl<S> {
@@ -32,7 +36,12 @@ public class DoCollectionImpl<S> extends PCollectionImpl<S> {
private final PType<S> ntype;
<T> DoCollectionImpl(String name, PCollectionImpl<T> parent, DoFn<T, S> fn, PType<S> ntype) {
- super(name);
+ this(name, parent, fn, ntype, ParallelDoOptions.builder().build());
+ }
+
+ <T> DoCollectionImpl(String name, PCollectionImpl<T> parent, DoFn<T, S> fn, PType<S> ntype,
+ ParallelDoOptions options) {
+ super(name, options);
this.parent = (PCollectionImpl<Object>) parent;
this.fn = (DoFn<Object, S>) fn;
this.ntype = ntype;
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2bc04f98/crunch/src/main/java/org/apache/crunch/impl/mr/collect/DoTableImpl.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/DoTableImpl.java b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/DoTableImpl.java
index 1d19580..176643b 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/DoTableImpl.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/DoTableImpl.java
@@ -23,6 +23,7 @@ import org.apache.crunch.CombineFn;
import org.apache.crunch.DoFn;
import org.apache.crunch.PTable;
import org.apache.crunch.Pair;
+import org.apache.crunch.ParallelDoOptions;
import org.apache.crunch.impl.mr.plan.DoNode;
import org.apache.crunch.types.PTableType;
import org.apache.crunch.types.PType;
@@ -36,7 +37,12 @@ public class DoTableImpl<K, V> extends PTableBase<K, V> implements PTable<K, V>
private final PTableType<K, V> type;
<S> DoTableImpl(String name, PCollectionImpl<S> parent, DoFn<S, Pair<K, V>> fn, PTableType<K, V> ntype) {
- super(name);
+ this(name, parent, fn, ntype, ParallelDoOptions.builder().build());
+ }
+
+ <S> DoTableImpl(String name, PCollectionImpl<S> parent, DoFn<S, Pair<K, V>> fn, PTableType<K, V> ntype,
+ ParallelDoOptions options) {
+ super(name, options);
this.parent = parent;
this.fn = fn;
this.type = ntype;
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2bc04f98/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java
index 79fe72b..8ad6692 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java
@@ -20,6 +20,7 @@ package org.apache.crunch.impl.mr.collect;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -30,6 +31,7 @@ import org.apache.crunch.PCollection;
import org.apache.crunch.PObject;
import org.apache.crunch.PTable;
import org.apache.crunch.Pair;
+import org.apache.crunch.ParallelDoOptions;
import org.apache.crunch.Pipeline;
import org.apache.crunch.SourceTarget;
import org.apache.crunch.Target;
@@ -43,6 +45,7 @@ import org.apache.crunch.types.PType;
import org.apache.crunch.types.PTypeFamily;
import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
public abstract class PCollectionImpl<S> implements PCollection<S> {
@@ -51,9 +54,15 @@ public abstract class PCollectionImpl<S> implements PCollection<S> {
private final String name;
protected MRPipeline pipeline;
private SourceTarget<S> materializedAt;
-
+ private final ParallelDoOptions options;
+
public PCollectionImpl(String name) {
+ this(name, ParallelDoOptions.builder().build());
+ }
+
+ public PCollectionImpl(String name, ParallelDoOptions options) {
this.name = name;
+ this.options = options;
}
@Override
@@ -86,7 +95,13 @@ public abstract class PCollectionImpl<S> implements PCollection<S> {
public <T> PCollection<T> parallelDo(String name, DoFn<S, T> fn, PType<T> type) {
return new DoCollectionImpl<T>(name, getChainingCollection(), fn, type);
}
-
+
+ @Override
+ public <T> PCollection<T> parallelDo(String name, DoFn<S, T> fn, PType<T> type,
+ ParallelDoOptions options) {
+ return new DoCollectionImpl<T>(name, getChainingCollection(), fn, type, options);
+ }
+
@Override
public <K, V> PTable<K, V> parallelDo(DoFn<S, Pair<K, V>> fn, PTableType<K, V> type) {
MRPipeline pipeline = (MRPipeline) getPipeline();
@@ -98,6 +113,12 @@ public abstract class PCollectionImpl<S> implements PCollection<S> {
return new DoTableImpl<K, V>(name, getChainingCollection(), fn, type);
}
+ @Override
+ public <K, V> PTable<K, V> parallelDo(String name, DoFn<S, Pair<K, V>> fn, PTableType<K, V> type,
+ ParallelDoOptions options) {
+ return new DoTableImpl<K, V>(name, getChainingCollection(), fn, type, options);
+ }
+
public PCollection<S> write(Target target) {
if (materializedAt != null) {
getPipeline().write(new InputCollection<S>(materializedAt, (MRPipeline) getPipeline()), target);
@@ -194,7 +215,15 @@ public abstract class PCollectionImpl<S> implements PCollection<S> {
}
return pipeline;
}
-
+
+ public Set<SourceTarget<?>> getTargetDependencies() {
+ Set<SourceTarget<?>> targetDeps = options.getSourceTargets();
+ for (PCollectionImpl<?> parent : getParents()) {
+ targetDeps = Sets.union(targetDeps, parent.getTargetDependencies());
+ }
+ return targetDeps;
+ }
+
public int getDepth() {
int parentMax = 0;
for (PCollectionImpl parent : getParents()) {
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2bc04f98/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PTableBase.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PTableBase.java b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PTableBase.java
index 03c2fdc..69ea8a3 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PTableBase.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PTableBase.java
@@ -27,6 +27,7 @@ import org.apache.crunch.PCollection;
import org.apache.crunch.PObject;
import org.apache.crunch.PTable;
import org.apache.crunch.Pair;
+import org.apache.crunch.ParallelDoOptions;
import org.apache.crunch.Target;
import org.apache.crunch.lib.Aggregate;
import org.apache.crunch.lib.Cogroup;
@@ -44,6 +45,10 @@ abstract class PTableBase<K, V> extends PCollectionImpl<Pair<K, V>> implements P
super(name);
}
+ public PTableBase(String name, ParallelDoOptions options) {
+ super(name, options);
+ }
+
public PType<K> getKeyType() {
return getPTableType().getKeyType();
}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2bc04f98/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
index 7fe2809..3718ec2 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeMap;
@@ -35,14 +36,23 @@ import org.apache.hadoop.conf.Configuration;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
public class MSCRPlanner {
+ private final MRPipeline pipeline;
+ private final Map<PCollectionImpl<?>, Set<Target>> outputs;
+
+ public MSCRPlanner(MRPipeline pipeline, Map<PCollectionImpl<?>, Set<Target>> outputs) {
+ this.pipeline = pipeline;
+ this.outputs = new TreeMap<PCollectionImpl<?>, Set<Target>>(DEPTH_COMPARATOR);
+ this.outputs.putAll(outputs);
+ }
+
// Used to ensure that we always build pipelines starting from the deepest
- // outputs, which
- // helps ensure that we handle intermediate outputs correctly.
+ // outputs, which helps ensure that we handle intermediate outputs correctly.
private static final Comparator<PCollectionImpl<?>> DEPTH_COMPARATOR = new Comparator<PCollectionImpl<?>>() {
@Override
public int compare(PCollectionImpl<?> left, PCollectionImpl<?> right) {
@@ -50,55 +60,88 @@ public class MSCRPlanner {
if (cmp == 0) {
// Ensure we don't throw away two output collections at the same depth.
// Using the collection name would be nicer here, but names aren't
- // necessarily unique
+ // necessarily unique.
cmp = new Integer(right.hashCode()).compareTo(left.hashCode());
}
return cmp;
}
};
- private final MRPipeline pipeline;
- private final Map<PCollectionImpl<?>, Set<Target>> outputs;
-
- public MSCRPlanner(MRPipeline pipeline, Map<PCollectionImpl<?>, Set<Target>> outputs) {
- this.pipeline = pipeline;
- this.outputs = new TreeMap<PCollectionImpl<?>, Set<Target>>(DEPTH_COMPARATOR);
- this.outputs.putAll(outputs);
- }
-
public MRExecutor plan(Class<?> jarClass, Configuration conf) throws IOException {
- // Walk the current plan tree and build a graph in which the vertices are
- // sources, targets, and GBK operations.
- GraphBuilder graphBuilder = new GraphBuilder();
- for (PCollectionImpl<?> output : outputs.keySet()) {
- graphBuilder.visitOutput(output);
+ Map<PCollectionImpl<?>, Set<SourceTarget<?>>> targetDeps = Maps.newTreeMap(DEPTH_COMPARATOR);
+ for (PCollectionImpl<?> pcollect : outputs.keySet()) {
+ targetDeps.put(pcollect, pcollect.getTargetDependencies());
}
- Graph baseGraph = graphBuilder.getGraph();
-
- // Create a new graph that splits up up dependent GBK nodes.
- Graph graph = prepareFinalGraph(baseGraph);
-
- // Break the graph up into connected components.
- List<List<Vertex>> components = graph.connectedComponents();
-
- // For each component, we will create one or more job prototypes,
- // depending on its profile.
- // For dependency handling, we only need to care about which
- // job prototype a particular GBK is assigned to.
Multimap<Vertex, JobPrototype> assignments = HashMultimap.create();
- for (List<Vertex> component : components) {
- assignments.putAll(constructJobPrototypes(component));
+ Multimap<PCollectionImpl<?>, Vertex> protoDependency = HashMultimap.create();
+ while (!targetDeps.isEmpty()) {
+ Set<Target> allTargets = Sets.newHashSet();
+ for (PCollectionImpl<?> pcollect : targetDeps.keySet()) {
+ allTargets.addAll(outputs.get(pcollect));
+ }
+ GraphBuilder graphBuilder = new GraphBuilder();
+
+ // Walk the current plan tree and build a graph in which the vertices are
+ // sources, targets, and GBK operations.
+ Set<PCollectionImpl<?>> currentStage = Sets.newHashSet();
+ Set<PCollectionImpl<?>> laterStage = Sets.newHashSet();
+ for (PCollectionImpl<?> output : targetDeps.keySet()) {
+ if (Sets.intersection(allTargets, targetDeps.get(output)).isEmpty()) {
+ graphBuilder.visitOutput(output);
+ currentStage.add(output);
+ } else {
+ laterStage.add(output);
+ }
+ }
+
+ Graph baseGraph = graphBuilder.getGraph();
+
+ // Create a new graph that splits up up dependent GBK nodes.
+ Graph graph = prepareFinalGraph(baseGraph);
+
+ // Break the graph up into connected components.
+ List<List<Vertex>> components = graph.connectedComponents();
+
+ // For each component, we will create one or more job prototypes,
+ // depending on its profile.
+ // For dependency handling, we only need to care about which
+ // job prototype a particular GBK is assigned to.
+ for (List<Vertex> component : components) {
+ assignments.putAll(constructJobPrototypes(component));
+ }
+
+ // Add in the job dependency information here.
+ for (Map.Entry<Vertex, JobPrototype> e : assignments.entries()) {
+ JobPrototype current = e.getValue();
+ List<Vertex> parents = graph.getParents(e.getKey());
+ for (Vertex parent : parents) {
+ for (JobPrototype parentJobProto : assignments.get(parent)) {
+ current.addDependency(parentJobProto);
+ }
+ }
+ }
+
+ // Add cross-stage dependencies.
+ for (PCollectionImpl<?> output : currentStage) {
+ Set<Target> targets = outputs.get(output);
+ Vertex vertex = graph.getVertexAt(output);
+ for (PCollectionImpl<?> later : laterStage) {
+ if (!Sets.intersection(targets, targetDeps.get(later)).isEmpty()) {
+ protoDependency.put(later, vertex);
+ }
+ }
+ targetDeps.remove(output);
+ }
}
-
- // Add in the job dependency information here.
- for (Map.Entry<Vertex, JobPrototype> e : assignments.entries()) {
- JobPrototype current = e.getValue();
- List<Vertex> parents = graph.getParents(e.getKey());
- for (Vertex parent : parents) {
- for (JobPrototype parentJobProto : assignments.get(parent)) {
- current.addDependency(parentJobProto);
+ // Cross-job dependencies.
+ for (Entry<PCollectionImpl<?>, Vertex> pd : protoDependency.entries()) {
+ Vertex d = new Vertex(pd.getKey());
+ Vertex dj = pd.getValue();
+ for (JobPrototype parent : assignments.get(dj)) {
+ for (JobPrototype child : assignments.get(d)) {
+ child.addDependency(parent);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2bc04f98/crunch/src/main/java/org/apache/crunch/io/ReadableSourceTarget.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/ReadableSourceTarget.java b/crunch/src/main/java/org/apache/crunch/io/ReadableSourceTarget.java
index 95c90aa..ac979c3 100644
--- a/crunch/src/main/java/org/apache/crunch/io/ReadableSourceTarget.java
+++ b/crunch/src/main/java/org/apache/crunch/io/ReadableSourceTarget.java
@@ -18,6 +18,7 @@
package org.apache.crunch.io;
import org.apache.crunch.SourceTarget;
+import org.apache.hadoop.fs.Path;
/**
* An interface that indicates that a {@code SourceTarget} instance can be read
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2bc04f98/crunch/src/main/java/org/apache/crunch/lib/join/MapsideJoin.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/lib/join/MapsideJoin.java b/crunch/src/main/java/org/apache/crunch/lib/join/MapsideJoin.java
index 1acbf2d..8116ea1 100644
--- a/crunch/src/main/java/org/apache/crunch/lib/join/MapsideJoin.java
+++ b/crunch/src/main/java/org/apache/crunch/lib/join/MapsideJoin.java
@@ -24,16 +24,18 @@ import org.apache.crunch.DoFn;
import org.apache.crunch.Emitter;
import org.apache.crunch.PTable;
import org.apache.crunch.Pair;
-import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.ParallelDoOptions;
import org.apache.crunch.io.ReadableSourceTarget;
-import org.apache.crunch.io.impl.SourcePathTargetImpl;
+import org.apache.crunch.materialize.MaterializableIterable;
import org.apache.crunch.types.PType;
import org.apache.crunch.types.PTypeFamily;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
/**
@@ -64,37 +66,46 @@ public class MapsideJoin {
* @return A table keyed on the join key, containing pairs of joined values
*/
public static <K, U, V> PTable<K, Pair<U, V>> join(PTable<K, U> left, PTable<K, V> right) {
-
- if (!(right.getPipeline() instanceof MRPipeline)) {
- throw new CrunchRuntimeException("Map-side join is only supported within a MapReduce context");
+ PTypeFamily tf = left.getTypeFamily();
+ Iterable<Pair<K, V>> iterable = right.materialize();
+
+ if (iterable instanceof MaterializableIterable) {
+ MaterializableIterable<Pair<K, V>> mi = (MaterializableIterable<Pair<K, V>>) iterable;
+ MapsideJoinDoFn<K, U, V> mapJoinDoFn = new MapsideJoinDoFn<K, U, V>(mi.getPath().toString(), right.getPType());
+ ParallelDoOptions options = ParallelDoOptions.builder()
+ .sourceTargets(mi.getSourceTarget())
+ .build();
+ return left.parallelDo("mapjoin", mapJoinDoFn,
+ tf.tableOf(left.getKeyType(), tf.pairs(left.getValueType(), right.getValueType())),
+ options);
+ } else { // in-memory pipeline
+ return left.parallelDo(new InMemoryJoinFn<K, U, V>(iterable),
+ tf.tableOf(left.getKeyType(), tf.pairs(left.getValueType(), right.getValueType())));
}
+ }
- MRPipeline pipeline = (MRPipeline) right.getPipeline();
- pipeline.materialize(right);
-
- // TODO Move necessary logic to MRPipeline so that we can theoretically
- // optimize his by running the setup of multiple map-side joins concurrently
- pipeline.run();
+ static class InMemoryJoinFn<K, U, V> extends DoFn<Pair<K, U>, Pair<K, Pair<U, V>>> {
- ReadableSourceTarget<Pair<K, V>> readableSourceTarget = pipeline.getMaterializeSourceTarget(right);
- if (!(readableSourceTarget instanceof SourcePathTargetImpl)) {
- throw new CrunchRuntimeException("Right-side contents can't be read from a path");
+ private Multimap<K, V> joinMap;
+
+ public InMemoryJoinFn(Iterable<Pair<K, V>> iterable) {
+ joinMap = HashMultimap.create();
+ for (Pair<K, V> joinPair : iterable) {
+ joinMap.put(joinPair.first(), joinPair.second());
+ }
+ }
+
+ @Override
+ public void process(Pair<K, U> input, Emitter<Pair<K, Pair<U, V>>> emitter) {
+ K key = input.first();
+ U value = input.second();
+ for (V joinValue : joinMap.get(key)) {
+ Pair<U, V> valuePair = Pair.of(value, joinValue);
+ emitter.emit(Pair.of(key, valuePair));
+ }
}
-
- // Suppress warnings because we've just checked this cast via instanceof
- @SuppressWarnings("unchecked")
- SourcePathTargetImpl<Pair<K, V>> sourcePathTarget = (SourcePathTargetImpl<Pair<K, V>>) readableSourceTarget;
-
- Path path = sourcePathTarget.getPath();
- DistributedCache.addCacheFile(path.toUri(), pipeline.getConfiguration());
-
- MapsideJoinDoFn<K, U, V> mapJoinDoFn = new MapsideJoinDoFn<K, U, V>(path.getName(), right.getPType());
- PTypeFamily typeFamily = left.getTypeFamily();
- return left.parallelDo("mapjoin", mapJoinDoFn,
- typeFamily.tableOf(left.getKeyType(), typeFamily.pairs(left.getValueType(), right.getValueType())));
-
}
-
+
static class MapsideJoinDoFn<K, U, V> extends DoFn<Pair<K, U>, Pair<K, Pair<U, V>>> {
private String inputPath;
@@ -122,6 +133,11 @@ public class MapsideJoin {
}
@Override
+ public void configure(Configuration conf) {
+ DistributedCache.addCacheFile(new Path(inputPath).toUri(), conf);
+ }
+
+ @Override
public void initialize() {
super.initialize();
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2bc04f98/crunch/src/main/java/org/apache/crunch/materialize/MaterializableIterable.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/materialize/MaterializableIterable.java b/crunch/src/main/java/org/apache/crunch/materialize/MaterializableIterable.java
index 2d6c573..0ed29e3 100644
--- a/crunch/src/main/java/org/apache/crunch/materialize/MaterializableIterable.java
+++ b/crunch/src/main/java/org/apache/crunch/materialize/MaterializableIterable.java
@@ -24,7 +24,9 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.crunch.CrunchRuntimeException;
import org.apache.crunch.Pipeline;
+import org.apache.crunch.io.PathTarget;
import org.apache.crunch.io.ReadableSourceTarget;
+import org.apache.hadoop.fs.Path;
public class MaterializableIterable<E> implements Iterable<E> {
@@ -44,6 +46,13 @@ public class MaterializableIterable<E> implements Iterable<E> {
return sourceTarget;
}
+ public Path getPath() {
+ if (sourceTarget instanceof PathTarget) {
+ return ((PathTarget) sourceTarget).getPath();
+ }
+ return null;
+ }
+
@Override
public Iterator<E> iterator() {
if (materialized == null) {