You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2013/04/23 22:41:34 UTC
[32/43] CRUNCH-196: crunch -> crunch-core rename to fix build issues
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java
new file mode 100644
index 0000000..f22b5a1
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java
@@ -0,0 +1,245 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.plan;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.Target;
+import org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchControlledJob;
+import org.apache.crunch.impl.mr.collect.DoTableImpl;
+import org.apache.crunch.impl.mr.collect.PCollectionImpl;
+import org.apache.crunch.impl.mr.collect.PGroupedTableImpl;
+import org.apache.crunch.impl.mr.exec.CrunchJobHooks;
+import org.apache.crunch.impl.mr.run.CrunchCombiner;
+import org.apache.crunch.impl.mr.run.CrunchInputFormat;
+import org.apache.crunch.impl.mr.run.CrunchMapper;
+import org.apache.crunch.impl.mr.run.CrunchReducer;
+import org.apache.crunch.impl.mr.run.NodeContext;
+import org.apache.crunch.impl.mr.run.RTNode;
+import org.apache.crunch.util.DistCache;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+class JobPrototype {
+
+ public static JobPrototype createMapReduceJob(int jobID, PGroupedTableImpl<?, ?> group,
+ Set<NodePath> inputs, Path workingPath) {
+ return new JobPrototype(jobID, inputs, group, workingPath);
+ }
+
+ public static JobPrototype createMapOnlyJob(int jobID, HashMultimap<Target, NodePath> mapNodePaths, Path workingPath) {
+ return new JobPrototype(jobID, mapNodePaths, workingPath);
+ }
+
+ private final int jobID; // TODO: maybe stageID sounds better
+ private final Set<NodePath> mapNodePaths;
+ private final PGroupedTableImpl<?, ?> group;
+ private final Set<JobPrototype> dependencies = Sets.newHashSet();
+ private final Map<PCollectionImpl<?>, DoNode> nodes = Maps.newHashMap();
+ private final Path workingPath;
+
+ private HashMultimap<Target, NodePath> targetsToNodePaths;
+ private DoTableImpl<?, ?> combineFnTable;
+
+ private CrunchControlledJob job;
+
+ private JobPrototype(int jobID, Set<NodePath> inputs, PGroupedTableImpl<?, ?> group, Path workingPath) {
+ this.jobID = jobID;
+ this.mapNodePaths = ImmutableSet.copyOf(inputs);
+ this.group = group;
+ this.workingPath = workingPath;
+ this.targetsToNodePaths = null;
+ }
+
+ private JobPrototype(int jobID, HashMultimap<Target, NodePath> outputPaths, Path workingPath) {
+ this.jobID = jobID;
+ this.group = null;
+ this.mapNodePaths = null;
+ this.workingPath = workingPath;
+ this.targetsToNodePaths = outputPaths;
+ }
+
+ public int getJobID() {
+ return jobID;
+ }
+
+ public boolean isMapOnly() {
+ return this.group == null;
+ }
+
+ Set<NodePath> getMapNodePaths() {
+ return mapNodePaths;
+ }
+
+ PGroupedTableImpl<?, ?> getGroupingTable() {
+ return group;
+ }
+
+ HashMultimap<Target, NodePath> getTargetsToNodePaths() {
+ return targetsToNodePaths;
+ }
+
+ public void addReducePaths(HashMultimap<Target, NodePath> outputPaths) {
+ if (group == null) {
+ throw new IllegalStateException("Cannot add a reduce phase to a map-only job");
+ }
+ this.targetsToNodePaths = outputPaths;
+ }
+
+ public void addDependency(JobPrototype dependency) {
+ this.dependencies.add(dependency);
+ }
+
+ public CrunchControlledJob getCrunchJob(Class<?> jarClass, Configuration conf, Pipeline pipeline) throws IOException {
+ if (job == null) {
+ job = build(jarClass, conf, pipeline);
+ for (JobPrototype proto : dependencies) {
+ job.addDependingJob(proto.getCrunchJob(jarClass, conf, pipeline));
+ }
+ }
+ return job;
+ }
+
+ private CrunchControlledJob build(Class<?> jarClass, Configuration conf, Pipeline pipeline) throws IOException {
+ Job job = new Job(conf);
+ conf = job.getConfiguration();
+ conf.set(PlanningParameters.CRUNCH_WORKING_DIRECTORY, workingPath.toString());
+ job.setJarByClass(jarClass);
+
+ Set<DoNode> outputNodes = Sets.newHashSet();
+ Set<Target> targets = targetsToNodePaths.keySet();
+ Path outputPath = new Path(workingPath, "output");
+ MSCROutputHandler outputHandler = new MSCROutputHandler(job, outputPath, group == null);
+ for (Target target : targets) {
+ DoNode node = null;
+ for (NodePath nodePath : targetsToNodePaths.get(target)) {
+ if (node == null) {
+ PCollectionImpl<?> collect = nodePath.tail();
+ node = DoNode.createOutputNode(target.toString(), collect.getPType());
+ outputHandler.configureNode(node, target);
+ }
+ outputNodes.add(walkPath(nodePath.descendingIterator(), node));
+ }
+ }
+
+ job.setMapperClass(CrunchMapper.class);
+ List<DoNode> inputNodes;
+ DoNode reduceNode = null;
+ if (group != null) {
+ job.setReducerClass(CrunchReducer.class);
+ List<DoNode> reduceNodes = Lists.newArrayList(outputNodes);
+ serialize(reduceNodes, conf, workingPath, NodeContext.REDUCE);
+ reduceNode = reduceNodes.get(0);
+
+ if (combineFnTable != null) {
+ job.setCombinerClass(CrunchCombiner.class);
+ DoNode combinerInputNode = group.createDoNode();
+ DoNode combineNode = combineFnTable.createDoNode();
+ combineNode.addChild(group.getGroupingNode());
+ combinerInputNode.addChild(combineNode);
+ serialize(ImmutableList.of(combinerInputNode), conf, workingPath, NodeContext.COMBINE);
+ }
+
+ group.configureShuffle(job);
+
+ DoNode mapOutputNode = group.getGroupingNode();
+ Set<DoNode> mapNodes = Sets.newHashSet();
+ for (NodePath nodePath : mapNodePaths) {
+ // Advance these one step, since we've already configured
+ // the grouping node, and the PGroupedTableImpl is the tail
+ // of the NodePath.
+ Iterator<PCollectionImpl<?>> iter = nodePath.descendingIterator();
+ iter.next();
+ mapNodes.add(walkPath(iter, mapOutputNode));
+ }
+ inputNodes = Lists.newArrayList(mapNodes);
+ } else { // No grouping
+ job.setNumReduceTasks(0);
+ inputNodes = Lists.newArrayList(outputNodes);
+ }
+ serialize(inputNodes, conf, workingPath, NodeContext.MAP);
+
+ if (inputNodes.size() == 1) {
+ DoNode inputNode = inputNodes.get(0);
+ inputNode.getSource().configureSource(job, -1);
+ } else {
+ for (int i = 0; i < inputNodes.size(); i++) {
+ DoNode inputNode = inputNodes.get(i);
+ inputNode.getSource().configureSource(job, i);
+ }
+ job.setInputFormatClass(CrunchInputFormat.class);
+ }
+ job.setJobName(createJobName(pipeline.getName(), inputNodes, reduceNode));
+
+ return new CrunchControlledJob(
+ jobID,
+ job,
+ new CrunchJobHooks.PrepareHook(job),
+ new CrunchJobHooks.CompletionHook(job, outputPath, outputHandler.getMultiPaths(), group == null));
+ }
+
+ private void serialize(List<DoNode> nodes, Configuration conf, Path workingPath, NodeContext context)
+ throws IOException {
+ List<RTNode> rtNodes = Lists.newArrayList();
+ for (DoNode node : nodes) {
+ rtNodes.add(node.toRTNode(true, conf, context));
+ }
+ Path path = new Path(workingPath, context.toString());
+ DistCache.write(conf, path, rtNodes);
+ }
+
+ private String createJobName(String pipelineName, List<DoNode> mapNodes, DoNode reduceNode) {
+ JobNameBuilder builder = new JobNameBuilder(pipelineName);
+ builder.visit(mapNodes);
+ if (reduceNode != null) {
+ builder.visit(reduceNode);
+ }
+ return builder.build();
+ }
+
+ private DoNode walkPath(Iterator<PCollectionImpl<?>> iter, DoNode working) {
+ while (iter.hasNext()) {
+ PCollectionImpl<?> collect = iter.next();
+ if (combineFnTable != null && !(collect instanceof PGroupedTableImpl)) {
+ combineFnTable = null;
+ } else if (collect instanceof DoTableImpl && ((DoTableImpl<?, ?>) collect).hasCombineFn()) {
+ combineFnTable = (DoTableImpl<?, ?>) collect;
+ }
+ if (!nodes.containsKey(collect)) {
+ nodes.put(collect, collect.createDoNode());
+ }
+ DoNode parent = nodes.get(collect);
+ parent.addChild(working);
+ working = parent;
+ }
+ return working;
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCROutputHandler.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCROutputHandler.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCROutputHandler.java
new file mode 100644
index 0000000..36c565e
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCROutputHandler.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.plan;
+
+import java.util.Map;
+
+import org.apache.crunch.Target;
+import org.apache.crunch.io.MapReduceTarget;
+import org.apache.crunch.io.OutputHandler;
+import org.apache.crunch.io.PathTarget;
+import org.apache.crunch.types.PType;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+
+import com.google.common.collect.Maps;
+
+public class MSCROutputHandler implements OutputHandler {
+
+ private final Job job;
+ private final Path path;
+ private final boolean mapOnlyJob;
+
+ private DoNode workingNode;
+ private Map<Integer, PathTarget> multiPaths;
+ private int jobCount;
+
+ public MSCROutputHandler(Job job, Path outputPath, boolean mapOnlyJob) {
+ this.job = job;
+ this.path = outputPath;
+ this.mapOnlyJob = mapOnlyJob;
+ this.multiPaths = Maps.newHashMap();
+ }
+
+ public void configureNode(DoNode node, Target target) {
+ workingNode = node;
+ target.accept(this, node.getPType());
+ }
+
+ public boolean configure(Target target, PType<?> ptype) {
+ if (target instanceof MapReduceTarget) {
+ if (target instanceof PathTarget) {
+ multiPaths.put(jobCount, (PathTarget) target);
+ }
+
+ String name = PlanningParameters.MULTI_OUTPUT_PREFIX + jobCount;
+ jobCount++;
+ workingNode.setOutputName(name);
+ ((MapReduceTarget) target).configureForMapReduce(job, ptype, path, name);
+ return true;
+ }
+
+ return false;
+ }
+
+ public boolean isMapOnlyJob() {
+ return mapOnlyJob;
+ }
+
+ public Map<Integer, PathTarget> getMultiPaths() {
+ return multiPaths;
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
new file mode 100644
index 0000000..3e1de38
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
@@ -0,0 +1,378 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.plan;
+
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeMap;
+
+import org.apache.crunch.SourceTarget;
+import org.apache.crunch.Target;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.impl.mr.collect.InputCollection;
+import org.apache.crunch.impl.mr.collect.PCollectionImpl;
+import org.apache.crunch.impl.mr.collect.PGroupedTableImpl;
+import org.apache.crunch.impl.mr.exec.MRExecutor;
+import org.apache.crunch.materialize.MaterializableIterable;
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
+
+public class MSCRPlanner {
+
+ private final MRPipeline pipeline;
+ private final Map<PCollectionImpl<?>, Set<Target>> outputs;
+ private final Map<PCollectionImpl<?>, MaterializableIterable> toMaterialize;
+ private int lastJobID = 0;
+
+ public MSCRPlanner(MRPipeline pipeline, Map<PCollectionImpl<?>, Set<Target>> outputs,
+ Map<PCollectionImpl<?>, MaterializableIterable> toMaterialize) {
+ this.pipeline = pipeline;
+ this.outputs = new TreeMap<PCollectionImpl<?>, Set<Target>>(DEPTH_COMPARATOR);
+ this.outputs.putAll(outputs);
+ this.toMaterialize = toMaterialize;
+ }
+
+ // Used to ensure that we always build pipelines starting from the deepest
+ // outputs, which helps ensure that we handle intermediate outputs correctly.
+ private static final Comparator<PCollectionImpl<?>> DEPTH_COMPARATOR = new Comparator<PCollectionImpl<?>>() {
+ @Override
+ public int compare(PCollectionImpl<?> left, PCollectionImpl<?> right) {
+ int cmp = right.getDepth() - left.getDepth();
+ if (cmp == 0) {
+ // Ensure we don't throw away two output collections at the same depth.
+ // Using the collection name would be nicer here, but names aren't
+ // necessarily unique.
+ cmp = new Integer(right.hashCode()).compareTo(left.hashCode());
+ }
+ return cmp;
+ }
+ };
+
+ public MRExecutor plan(Class<?> jarClass, Configuration conf) throws IOException {
+ Map<PCollectionImpl<?>, Set<SourceTarget<?>>> targetDeps = Maps.newTreeMap(DEPTH_COMPARATOR);
+ for (PCollectionImpl<?> pcollect : outputs.keySet()) {
+ targetDeps.put(pcollect, pcollect.getTargetDependencies());
+ }
+
+ Multimap<Vertex, JobPrototype> assignments = HashMultimap.create();
+ Multimap<PCollectionImpl<?>, Vertex> protoDependency = HashMultimap.create();
+ while (!targetDeps.isEmpty()) {
+ Set<Target> allTargets = Sets.newHashSet();
+ for (PCollectionImpl<?> pcollect : targetDeps.keySet()) {
+ allTargets.addAll(outputs.get(pcollect));
+ }
+ GraphBuilder graphBuilder = new GraphBuilder();
+
+ // Walk the current plan tree and build a graph in which the vertices are
+ // sources, targets, and GBK operations.
+ Set<PCollectionImpl<?>> currentStage = Sets.newHashSet();
+ Set<PCollectionImpl<?>> laterStage = Sets.newHashSet();
+ for (PCollectionImpl<?> output : targetDeps.keySet()) {
+ if (Sets.intersection(allTargets, targetDeps.get(output)).isEmpty()) {
+ graphBuilder.visitOutput(output);
+ currentStage.add(output);
+ } else {
+ laterStage.add(output);
+ }
+ }
+
+ Graph baseGraph = graphBuilder.getGraph();
+
+ // Create a new graph that splits up up dependent GBK nodes.
+ Graph graph = prepareFinalGraph(baseGraph);
+
+ // Break the graph up into connected components.
+ List<List<Vertex>> components = graph.connectedComponents();
+
+ // For each component, we will create one or more job prototypes,
+ // depending on its profile.
+ // For dependency handling, we only need to care about which
+ // job prototype a particular GBK is assigned to.
+ for (List<Vertex> component : components) {
+ assignments.putAll(constructJobPrototypes(component));
+ }
+
+ // Add in the job dependency information here.
+ for (Map.Entry<Vertex, JobPrototype> e : assignments.entries()) {
+ JobPrototype current = e.getValue();
+ List<Vertex> parents = graph.getParents(e.getKey());
+ for (Vertex parent : parents) {
+ for (JobPrototype parentJobProto : assignments.get(parent)) {
+ current.addDependency(parentJobProto);
+ }
+ }
+ }
+
+ // Add cross-stage dependencies.
+ for (PCollectionImpl<?> output : currentStage) {
+ Set<Target> targets = outputs.get(output);
+ Vertex vertex = graph.getVertexAt(output);
+ for (PCollectionImpl<?> later : laterStage) {
+ if (!Sets.intersection(targets, targetDeps.get(later)).isEmpty()) {
+ protoDependency.put(later, vertex);
+ }
+ }
+ targetDeps.remove(output);
+ }
+ }
+
+ // Cross-job dependencies.
+ for (Entry<PCollectionImpl<?>, Vertex> pd : protoDependency.entries()) {
+ Vertex d = new Vertex(pd.getKey());
+ Vertex dj = pd.getValue();
+ for (JobPrototype parent : assignments.get(dj)) {
+ for (JobPrototype child : assignments.get(d)) {
+ child.addDependency(parent);
+ }
+ }
+ }
+
+ // Finally, construct the jobs from the prototypes and return.
+ DotfileWriter dotfileWriter = new DotfileWriter();
+ MRExecutor exec = new MRExecutor(jarClass, outputs, toMaterialize);
+ for (JobPrototype proto : Sets.newHashSet(assignments.values())) {
+ dotfileWriter.addJobPrototype(proto);
+ exec.addJob(proto.getCrunchJob(jarClass, conf, pipeline));
+ }
+
+ String planDotFile = dotfileWriter.buildDotfile();
+ exec.setPlanDotFile(planDotFile);
+ conf.set(PlanningParameters.PIPELINE_PLAN_DOTFILE, planDotFile);
+
+ return exec;
+ }
+
+ private Graph prepareFinalGraph(Graph baseGraph) {
+ Graph graph = new Graph();
+
+ for (Vertex baseVertex : baseGraph) {
+ // Add all of the vertices in the base graph, but no edges (yet).
+ graph.addVertex(baseVertex.getPCollection(), baseVertex.isOutput());
+ }
+
+ for (Edge e : baseGraph.getAllEdges()) {
+ // Add back all of the edges where neither vertex is a GBK and we do not
+ // have an output feeding into a GBK.
+ if (!(e.getHead().isGBK() && e.getTail().isGBK()) &&
+ !(e.getHead().isOutput() && e.getTail().isGBK())) {
+ Vertex head = graph.getVertexAt(e.getHead().getPCollection());
+ Vertex tail = graph.getVertexAt(e.getTail().getPCollection());
+ graph.getEdge(head, tail).addAllNodePaths(e.getNodePaths());
+ }
+ }
+
+ for (Vertex baseVertex : baseGraph) {
+ if (baseVertex.isGBK()) {
+ Vertex vertex = graph.getVertexAt(baseVertex.getPCollection());
+ for (Edge e : baseVertex.getIncomingEdges()) {
+ if (e.getHead().isOutput()) {
+ // Execute an edge split.
+ Vertex splitTail = e.getHead();
+ PCollectionImpl<?> split = splitTail.getPCollection();
+ InputCollection<?> inputNode = handleSplitTarget(split);
+ Vertex splitHead = graph.addVertex(inputNode, false);
+
+ // Divide up the node paths in the edge between the two GBK nodes so
+ // that each node is either owned by GBK1 -> newTail or newHead -> GBK2.
+ for (NodePath path : e.getNodePaths()) {
+ NodePath headPath = path.splitAt(split, splitHead.getPCollection());
+ graph.getEdge(vertex, splitTail).addNodePath(headPath);
+ graph.getEdge(splitHead, vertex).addNodePath(path);
+ }
+
+ // Note the dependency between the vertices in the graph.
+ graph.markDependency(splitHead, splitTail);
+ } else if (!e.getHead().isGBK()) {
+ Vertex newHead = graph.getVertexAt(e.getHead().getPCollection());
+ graph.getEdge(newHead, vertex).addAllNodePaths(e.getNodePaths());
+ }
+ }
+ for (Edge e : baseVertex.getOutgoingEdges()) {
+ if (!e.getTail().isGBK()) {
+ Vertex newTail = graph.getVertexAt(e.getTail().getPCollection());
+ graph.getEdge(vertex, newTail).addAllNodePaths(e.getNodePaths());
+ } else {
+ // Execute an Edge split
+ Vertex newGraphTail = graph.getVertexAt(e.getTail().getPCollection());
+ PCollectionImpl split = e.getSplit();
+ InputCollection<?> inputNode = handleSplitTarget(split);
+ Vertex splitTail = graph.addVertex(split, true);
+ Vertex splitHead = graph.addVertex(inputNode, false);
+
+ // Divide up the node paths in the edge between the two GBK nodes so
+ // that each node is either owned by GBK1 -> newTail or newHead -> GBK2.
+ for (NodePath path : e.getNodePaths()) {
+ NodePath headPath = path.splitAt(split, splitHead.getPCollection());
+ graph.getEdge(vertex, splitTail).addNodePath(headPath);
+ graph.getEdge(splitHead, newGraphTail).addNodePath(path);
+ }
+
+ // Note the dependency between the vertices in the graph.
+ graph.markDependency(splitHead, splitTail);
+ }
+ }
+ }
+ }
+
+ return graph;
+ }
+
+ private Multimap<Vertex, JobPrototype> constructJobPrototypes(List<Vertex> component) {
+ Multimap<Vertex, JobPrototype> assignment = HashMultimap.create();
+ List<Vertex> gbks = Lists.newArrayList();
+ for (Vertex v : component) {
+ if (v.isGBK()) {
+ gbks.add(v);
+ }
+ }
+
+ if (gbks.isEmpty()) {
+ HashMultimap<Target, NodePath> outputPaths = HashMultimap.create();
+ for (Vertex v : component) {
+ if (v.isInput()) {
+ for (Edge e : v.getOutgoingEdges()) {
+ for (NodePath nodePath : e.getNodePaths()) {
+ PCollectionImpl target = nodePath.tail();
+ for (Target t : outputs.get(target)) {
+ outputPaths.put(t, nodePath);
+ }
+ }
+ }
+ }
+ }
+ if (outputPaths.isEmpty()) {
+ throw new IllegalStateException("No outputs?");
+ }
+ JobPrototype prototype = JobPrototype.createMapOnlyJob(
+ ++lastJobID, outputPaths, pipeline.createTempPath());
+ for (Vertex v : component) {
+ assignment.put(v, prototype);
+ }
+ } else {
+ Set<Edge> usedEdges = Sets.newHashSet();
+ for (Vertex g : gbks) {
+ Set<NodePath> inputs = Sets.newHashSet();
+ for (Edge e : g.getIncomingEdges()) {
+ inputs.addAll(e.getNodePaths());
+ usedEdges.add(e);
+ }
+ JobPrototype prototype = JobPrototype.createMapReduceJob(
+ ++lastJobID, (PGroupedTableImpl) g.getPCollection(), inputs, pipeline.createTempPath());
+ assignment.put(g, prototype);
+ for (Edge e : g.getIncomingEdges()) {
+ assignment.put(e.getHead(), prototype);
+ usedEdges.add(e);
+ }
+ HashMultimap<Target, NodePath> outputPaths = HashMultimap.create();
+ for (Edge e : g.getOutgoingEdges()) {
+ Vertex output = e.getTail();
+ for (Target t : outputs.get(output.getPCollection())) {
+ outputPaths.putAll(t, e.getNodePaths());
+ }
+ assignment.put(output, prototype);
+ usedEdges.add(e);
+ }
+ prototype.addReducePaths(outputPaths);
+ }
+
+ // Check for any un-assigned vertices, which should be map-side outputs
+ // that we will need to run in a map-only job.
+ HashMultimap<Target, NodePath> outputPaths = HashMultimap.create();
+ Set<Vertex> orphans = Sets.newHashSet();
+ for (Vertex v : component) {
+
+ // Check if this vertex has multiple inputs but only a subset of
+ // them have already been assigned
+ boolean vertexHasUnassignedIncomingEdges = false;
+ if (v.isOutput()) {
+ for (Edge e : v.getIncomingEdges()) {
+ if (!usedEdges.contains(e)) {
+ vertexHasUnassignedIncomingEdges = true;
+ }
+ }
+ }
+
+ if (v.isOutput() && (vertexHasUnassignedIncomingEdges || !assignment.containsKey(v))) {
+ orphans.add(v);
+ for (Edge e : v.getIncomingEdges()) {
+ if (vertexHasUnassignedIncomingEdges && usedEdges.contains(e)) {
+ // We've already dealt with this incoming edge
+ continue;
+ }
+ orphans.add(e.getHead());
+ for (NodePath nodePath : e.getNodePaths()) {
+ PCollectionImpl target = nodePath.tail();
+ for (Target t : outputs.get(target)) {
+ outputPaths.put(t, nodePath);
+ }
+ }
+ }
+ }
+
+ }
+ if (!outputPaths.isEmpty()) {
+ JobPrototype prototype = JobPrototype.createMapOnlyJob(
+ ++lastJobID, outputPaths, pipeline.createTempPath());
+ for (Vertex orphan : orphans) {
+ assignment.put(orphan, prototype);
+ }
+ }
+ }
+
+ return assignment;
+ }
+
+ private InputCollection<?> handleSplitTarget(PCollectionImpl<?> splitTarget) {
+ if (!outputs.containsKey(splitTarget)) {
+ outputs.put(splitTarget, Sets.<Target> newHashSet());
+ }
+
+ SourceTarget srcTarget = null;
+ Target targetToReplace = null;
+ for (Target t : outputs.get(splitTarget)) {
+ if (t instanceof SourceTarget) {
+ srcTarget = (SourceTarget<?>) t;
+ break;
+ } else {
+ srcTarget = t.asSourceTarget(splitTarget.getPType());
+ if (srcTarget != null) {
+ targetToReplace = t;
+ break;
+ }
+ }
+ }
+ if (targetToReplace != null) {
+ outputs.get(splitTarget).remove(targetToReplace);
+ } else if (srcTarget == null) {
+ srcTarget = pipeline.createIntermediateOutput(splitTarget.getPType());
+ }
+ outputs.get(splitTarget).add(srcTarget);
+ splitTarget.materializeAt(srcTarget);
+
+ return (InputCollection<?>) pipeline.read(srcTarget);
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/NodePath.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/NodePath.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/NodePath.java
new file mode 100644
index 0000000..a090d93
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/NodePath.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.plan;
+
+import java.util.Iterator;
+import java.util.LinkedList;
+
+import org.apache.crunch.impl.mr.collect.PCollectionImpl;
+
+import com.google.common.collect.Lists;
+
+class NodePath implements Iterable<PCollectionImpl<?>> {
+ private LinkedList<PCollectionImpl<?>> path;
+
+ public NodePath() {
+ this.path = Lists.newLinkedList();
+ }
+
+ public NodePath(PCollectionImpl<?> tail) {
+ this.path = Lists.newLinkedList();
+ this.path.add(tail);
+ }
+
+ public NodePath(NodePath other) {
+ this.path = Lists.newLinkedList(other.path);
+ }
+
+ public void push(PCollectionImpl<?> stage) {
+ this.path.push((PCollectionImpl<?>) stage);
+ }
+
+ public NodePath close(PCollectionImpl<?> head) {
+ this.path.push(head);
+ return this;
+ }
+
+ public Iterator<PCollectionImpl<?>> iterator() {
+ return path.iterator();
+ }
+
+ public Iterator<PCollectionImpl<?>> descendingIterator() {
+ return path.descendingIterator();
+ }
+
+ public PCollectionImpl<?> get(int index) {
+ return path.get(index);
+ }
+
+ public PCollectionImpl<?> head() {
+ return path.peekFirst();
+ }
+
+ public PCollectionImpl<?> tail() {
+ return path.peekLast();
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other == null || !(other instanceof NodePath)) {
+ return false;
+ }
+ NodePath nodePath = (NodePath) other;
+ return path.equals(nodePath.path);
+ }
+
+ @Override
+ public int hashCode() {
+ return 17 + 37 * path.hashCode();
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ for (PCollectionImpl<?> collect : path) {
+ sb.append(collect.getName() + "|");
+ }
+ sb.deleteCharAt(sb.length() - 1);
+ return sb.toString();
+ }
+
+ public NodePath splitAt(int splitIndex, PCollectionImpl<?> newHead) {
+ NodePath top = new NodePath();
+ for (int i = 0; i <= splitIndex; i++) {
+ top.path.add(path.get(i));
+ }
+ LinkedList<PCollectionImpl<?>> nextPath = Lists.newLinkedList();
+ nextPath.add(newHead);
+ nextPath.addAll(path.subList(splitIndex + 1, path.size()));
+ path = nextPath;
+ return top;
+ }
+
+ public NodePath splitAt(PCollectionImpl split, PCollectionImpl<?> newHead) {
+ NodePath top = new NodePath();
+ int splitIndex = 0;
+ for (PCollectionImpl p : path) {
+ top.path.add(p);
+ if (p == split) {
+ break;
+ }
+ splitIndex++;
+ }
+ LinkedList<PCollectionImpl<?>> nextPath = Lists.newLinkedList();
+ nextPath.add(newHead);
+ nextPath.addAll(path.subList(splitIndex + 1, path.size()));
+ path = nextPath;
+ return top;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/PlanningParameters.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/PlanningParameters.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/PlanningParameters.java
new file mode 100644
index 0000000..b90a911
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/PlanningParameters.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.plan;
+
+/**
+ * Collection of Configuration keys and various constants used when planning MapReduce jobs for a
+ * pipeline.
+ */
+public class PlanningParameters {
+
+ public static final String MULTI_OUTPUT_PREFIX = "out";
+
+ public static final String CRUNCH_WORKING_DIRECTORY = "crunch.work.dir";
+
+ /**
+ * Configuration key under which a <a href="http://www.graphviz.org">DOT</a> file containing the
+ * pipeline job graph is stored by the planner.
+ */
+ public static final String PIPELINE_PLAN_DOTFILE = "crunch.planner.dotfile";
+
+ private PlanningParameters() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/Vertex.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/Vertex.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/Vertex.java
new file mode 100644
index 0000000..f4aa668
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/Vertex.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.plan;
+
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.lang.builder.ReflectionToStringBuilder;
+import org.apache.commons.lang.builder.ToStringStyle;
+import org.apache.crunch.Source;
+import org.apache.crunch.impl.mr.collect.InputCollection;
+import org.apache.crunch.impl.mr.collect.PCollectionImpl;
+import org.apache.crunch.impl.mr.collect.PGroupedTableImpl;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+/**
+ *
+ */
+class Vertex {
+ private final PCollectionImpl impl;
+
+ private boolean output;
+ private Set<Edge> incoming;
+ private Set<Edge> outgoing;
+
+ public Vertex(PCollectionImpl impl) {
+ this.impl = impl;
+ this.incoming = Sets.newHashSet();
+ this.outgoing = Sets.newHashSet();
+ }
+
+ public PCollectionImpl getPCollection() {
+ return impl;
+ }
+
+ public boolean isInput() {
+ return impl instanceof InputCollection;
+ }
+
+ public boolean isGBK() {
+ return impl instanceof PGroupedTableImpl;
+ }
+
+ public void setOutput() {
+ this.output = true;
+ }
+
+ public boolean isOutput() {
+ return output;
+ }
+
+ public Source getSource() {
+ if (isInput()) {
+ return ((InputCollection) impl).getSource();
+ }
+ return null;
+ }
+
+ public void addIncoming(Edge edge) {
+ this.incoming.add(edge);
+ }
+
+ public void addOutgoing(Edge edge) {
+ this.outgoing.add(edge);
+ }
+
+ public List<Vertex> getAllNeighbors() {
+ List<Vertex> n = Lists.newArrayList();
+ for (Edge e : incoming) {
+ n.add(e.getHead());
+ }
+ for (Edge e : outgoing) {
+ n.add(e.getTail());
+ }
+ return n;
+ }
+
+ public Set<Edge> getAllEdges() {
+ return Sets.union(incoming, outgoing);
+ }
+
+ public Set<Edge> getIncomingEdges() {
+ return incoming;
+ }
+
+ public Set<Edge> getOutgoingEdges() {
+ return outgoing;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == null || !(obj instanceof Vertex)) {
+ return false;
+ }
+ Vertex other = (Vertex) obj;
+ return impl.equals(other.impl);
+ }
+
+ @Override
+ public int hashCode() {
+ return 17 + 37 * impl.hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return new ReflectionToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE).setExcludeFieldNames(
+ new String[] { "outgoing", "incoming" }).toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchCombiner.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchCombiner.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchCombiner.java
new file mode 100644
index 0000000..47a3ded
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchCombiner.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.run;
+
+public class CrunchCombiner extends CrunchReducer {
+
+ @Override
+ protected NodeContext getNodeContext() {
+ return NodeContext.COMBINE;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputFormat.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputFormat.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputFormat.java
new file mode 100644
index 0000000..eb5dd8a
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputFormat.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.run;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.crunch.io.CrunchInputs;
+import org.apache.crunch.io.FormatBundle;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import com.google.common.collect.Lists;
+
+public class CrunchInputFormat<K, V> extends InputFormat<K, V> {
+
+ @Override
+ public List<InputSplit> getSplits(JobContext job) throws IOException, InterruptedException {
+ List<InputSplit> splits = Lists.newArrayList();
+ Configuration base = job.getConfiguration();
+ Map<FormatBundle, Map<Integer, List<Path>>> formatNodeMap = CrunchInputs.getFormatNodeMap(job);
+
+ // First, build a map of InputFormats to Paths
+ for (Map.Entry<FormatBundle, Map<Integer, List<Path>>> entry : formatNodeMap.entrySet()) {
+ FormatBundle inputBundle = entry.getKey();
+ Configuration conf = new Configuration(base);
+ inputBundle.configure(conf);
+ Job jobCopy = new Job(conf);
+ InputFormat<?, ?> format = (InputFormat<?, ?>) ReflectionUtils.newInstance(inputBundle.getFormatClass(),
+ jobCopy.getConfiguration());
+ for (Map.Entry<Integer, List<Path>> nodeEntry : entry.getValue().entrySet()) {
+ Integer nodeIndex = nodeEntry.getKey();
+ List<Path> paths = nodeEntry.getValue();
+ FileInputFormat.setInputPaths(jobCopy, paths.toArray(new Path[paths.size()]));
+
+ // Get splits for each input path and tag with InputFormat
+ // and Mapper types by wrapping in a TaggedInputSplit.
+ List<InputSplit> pathSplits = format.getSplits(jobCopy);
+ for (InputSplit pathSplit : pathSplits) {
+ splits.add(new CrunchInputSplit(pathSplit, inputBundle.getFormatClass(),
+ nodeIndex, jobCopy.getConfiguration()));
+ }
+ }
+ }
+ return splits;
+ }
+
+ @Override
+ public RecordReader<K, V> createRecordReader(InputSplit inputSplit, TaskAttemptContext context) throws IOException,
+ InterruptedException {
+ return new CrunchRecordReader<K, V>(inputSplit, context);
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputSplit.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputSplit.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputSplit.java
new file mode 100644
index 0000000..b41062b
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputSplit.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.run;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.io.serializer.Serializer;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.util.ReflectionUtils;
+
+class CrunchInputSplit extends InputSplit implements Writable {
+
+ private InputSplit inputSplit;
+ private Class<? extends InputFormat<?, ?>> inputFormatClass;
+ private int nodeIndex;
+ private Configuration conf;
+
+ public CrunchInputSplit() {
+ // default constructor
+ }
+
+ public CrunchInputSplit(
+ InputSplit inputSplit,
+ Class<? extends InputFormat<?, ?>> inputFormatClass,
+ int nodeIndex,
+ Configuration conf) {
+ this.inputSplit = inputSplit;
+ this.inputFormatClass = inputFormatClass;
+ this.nodeIndex = nodeIndex;
+ this.conf = conf;
+ }
+
+ public Configuration getConf() {
+ return conf;
+ }
+
+ public int getNodeIndex() {
+ return nodeIndex;
+ }
+
+ public InputSplit getInputSplit() {
+ return inputSplit;
+ }
+
+ public Class<? extends InputFormat<?, ?>> getInputFormatClass() {
+ return inputFormatClass;
+ }
+
+ @Override
+ public long getLength() throws IOException, InterruptedException {
+ return inputSplit.getLength();
+ }
+
+ @Override
+ public String[] getLocations() throws IOException, InterruptedException {
+ return inputSplit.getLocations();
+ }
+
+ public void readFields(DataInput in) throws IOException {
+ nodeIndex = in.readInt();
+ conf = new Configuration();
+ conf.readFields(in);
+ inputFormatClass = (Class<? extends InputFormat<?, ?>>) readClass(in);
+ Class<? extends InputSplit> inputSplitClass = (Class<? extends InputSplit>) readClass(in);
+ inputSplit = (InputSplit) ReflectionUtils.newInstance(inputSplitClass, conf);
+ SerializationFactory factory = new SerializationFactory(conf);
+ Deserializer deserializer = factory.getDeserializer(inputSplitClass);
+ deserializer.open((DataInputStream) in);
+ inputSplit = (InputSplit) deserializer.deserialize(inputSplit);
+ }
+
+ private Class<?> readClass(DataInput in) throws IOException {
+ String className = Text.readString(in);
+ try {
+ return conf.getClassByName(className);
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException("readObject can't find class", e);
+ }
+ }
+
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(nodeIndex);
+ conf.write(out);
+ Text.writeString(out, inputFormatClass.getName());
+ Text.writeString(out, inputSplit.getClass().getName());
+ SerializationFactory factory = new SerializationFactory(conf);
+ Serializer serializer = factory.getSerializer(inputSplit.getClass());
+ serializer.open((DataOutputStream) out);
+ serializer.serialize(inputSplit);
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchMapper.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchMapper.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchMapper.java
new file mode 100644
index 0000000..70f0b01
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchMapper.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.run;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.crunch.CrunchRuntimeException;
+import org.apache.hadoop.mapreduce.Mapper;
+
+public class CrunchMapper extends Mapper<Object, Object, Object, Object> {
+
+ private static final Log LOG = LogFactory.getLog(CrunchMapper.class);
+
+ private RTNode node;
+ private CrunchTaskContext ctxt;
+ private boolean debug;
+
+ @Override
+ protected void setup(Mapper<Object, Object, Object, Object>.Context context) {
+ List<RTNode> nodes;
+ this.ctxt = new CrunchTaskContext(context, NodeContext.MAP);
+ try {
+ nodes = ctxt.getNodes();
+ } catch (IOException e) {
+ LOG.info("Crunch deserialization error", e);
+ throw new CrunchRuntimeException(e);
+ }
+ if (nodes.size() == 1) {
+ this.node = nodes.get(0);
+ } else {
+ CrunchInputSplit split = (CrunchInputSplit) context.getInputSplit();
+ this.node = nodes.get(split.getNodeIndex());
+ }
+ this.debug = ctxt.isDebugRun();
+ }
+
+ @Override
+ protected void map(Object k, Object v, Mapper<Object, Object, Object, Object>.Context context) {
+ if (debug) {
+ try {
+ node.process(k, v);
+ } catch (Exception e) {
+ LOG.error("Mapper exception", e);
+ }
+ } else {
+ node.process(k, v);
+ }
+ }
+
+ @Override
+ protected void cleanup(Mapper<Object, Object, Object, Object>.Context context) {
+ node.cleanup();
+ ctxt.cleanup();
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchRecordReader.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchRecordReader.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchRecordReader.java
new file mode 100644
index 0000000..fc8fb32
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchRecordReader.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.run;
+
+import java.io.IOException;
+
+import org.apache.crunch.hadoop.mapreduce.TaskAttemptContextFactory;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.ReflectionUtils;
+
+class CrunchRecordReader<K, V> extends RecordReader<K, V> {
+
+ private final RecordReader<K, V> delegate;
+
+ public CrunchRecordReader(InputSplit inputSplit, final TaskAttemptContext context) throws IOException,
+ InterruptedException {
+ CrunchInputSplit crunchSplit = (CrunchInputSplit) inputSplit;
+ InputFormat<K, V> inputFormat = (InputFormat<K, V>) ReflectionUtils.newInstance(crunchSplit.getInputFormatClass(),
+ crunchSplit.getConf());
+ this.delegate = inputFormat.createRecordReader(crunchSplit.getInputSplit(),
+ TaskAttemptContextFactory.create(crunchSplit.getConf(), context.getTaskAttemptID()));
+ }
+
+ @Override
+ public void close() throws IOException {
+ delegate.close();
+ }
+
+ @Override
+ public K getCurrentKey() throws IOException, InterruptedException {
+ return delegate.getCurrentKey();
+ }
+
+ @Override
+ public V getCurrentValue() throws IOException, InterruptedException {
+ return delegate.getCurrentValue();
+ }
+
+ @Override
+ public float getProgress() throws IOException, InterruptedException {
+ return delegate.getProgress();
+ }
+
+ @Override
+ public void initialize(InputSplit inputSplit, TaskAttemptContext context) throws IOException, InterruptedException {
+ CrunchInputSplit crunchSplit = (CrunchInputSplit) inputSplit;
+ InputSplit delegateSplit = crunchSplit.getInputSplit();
+ delegate.initialize(delegateSplit,
+ TaskAttemptContextFactory.create(crunchSplit.getConf(), context.getTaskAttemptID()));
+ }
+
+ @Override
+ public boolean nextKeyValue() throws IOException, InterruptedException {
+ return delegate.nextKeyValue();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchReducer.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchReducer.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchReducer.java
new file mode 100644
index 0000000..e5ddbd2
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchReducer.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.run;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.crunch.CrunchRuntimeException;
+import org.apache.crunch.impl.SingleUseIterable;
+import org.apache.hadoop.mapreduce.Reducer;
+
+public class CrunchReducer extends Reducer<Object, Object, Object, Object> {
+
+ private static final Log LOG = LogFactory.getLog(CrunchReducer.class);
+
+ private RTNode node;
+ private CrunchTaskContext ctxt;
+ private boolean debug;
+
+ protected NodeContext getNodeContext() {
+ return NodeContext.REDUCE;
+ }
+
+ @Override
+ protected void setup(Reducer<Object, Object, Object, Object>.Context context) {
+ this.ctxt = new CrunchTaskContext(context, getNodeContext());
+ try {
+ List<RTNode> nodes = ctxt.getNodes();
+ this.node = nodes.get(0);
+ } catch (IOException e) {
+ LOG.info("Crunch deserialization error", e);
+ throw new CrunchRuntimeException(e);
+ }
+ this.debug = ctxt.isDebugRun();
+ }
+
+ @Override
+ protected void reduce(Object key, Iterable<Object> values, Reducer<Object, Object, Object, Object>.Context context) {
+ values = new SingleUseIterable<Object>(values);
+ if (debug) {
+ try {
+ node.processIterable(key, values);
+ } catch (Exception e) {
+ LOG.error("Reducer exception", e);
+ }
+ } else {
+ node.processIterable(key, values);
+ }
+ }
+
+ @Override
+ protected void cleanup(Reducer<Object, Object, Object, Object>.Context context) {
+ node.cleanup();
+ ctxt.cleanup();
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchTaskContext.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchTaskContext.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchTaskContext.java
new file mode 100644
index 0000000..c4f2873
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchTaskContext.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.run;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.crunch.CrunchRuntimeException;
+import org.apache.crunch.impl.mr.plan.PlanningParameters;
+import org.apache.crunch.io.CrunchOutputs;
+import org.apache.crunch.util.DistCache;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+
+class CrunchTaskContext {
+
+ private final TaskInputOutputContext<Object, Object, Object, Object> taskContext;
+ private final NodeContext nodeContext;
+ private CrunchOutputs<Object, Object> multipleOutputs;
+
+ public CrunchTaskContext(TaskInputOutputContext<Object, Object, Object, Object> taskContext, NodeContext nodeContext) {
+ this.taskContext = taskContext;
+ this.nodeContext = nodeContext;
+ }
+
+ public TaskInputOutputContext<Object, Object, Object, Object> getContext() {
+ return taskContext;
+ }
+
+ public NodeContext getNodeContext() {
+ return nodeContext;
+ }
+
+ public List<RTNode> getNodes() throws IOException {
+ Configuration conf = taskContext.getConfiguration();
+ Path path = new Path(new Path(conf.get(PlanningParameters.CRUNCH_WORKING_DIRECTORY)), nodeContext.toString());
+ @SuppressWarnings("unchecked")
+ List<RTNode> nodes = (List<RTNode>) DistCache.read(conf, path);
+ if (nodes != null) {
+ for (RTNode node : nodes) {
+ node.initialize(this);
+ }
+ }
+ return nodes;
+ }
+
+ public boolean isDebugRun() {
+ Configuration conf = taskContext.getConfiguration();
+ return conf.getBoolean(RuntimeParameters.DEBUG, false);
+ }
+
+ public void cleanup() {
+ if (multipleOutputs != null) {
+ try {
+ multipleOutputs.close();
+ } catch (IOException e) {
+ throw new CrunchRuntimeException(e);
+ } catch (InterruptedException e) {
+ throw new CrunchRuntimeException(e);
+ }
+ }
+ }
+
+ public CrunchOutputs<Object, Object> getMultipleOutputs() {
+ if (multipleOutputs == null) {
+ multipleOutputs = new CrunchOutputs<Object, Object>(taskContext);
+ }
+ return multipleOutputs;
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/NodeContext.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/NodeContext.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/NodeContext.java
new file mode 100644
index 0000000..ffc9e7c
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/NodeContext.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.run;
+
+import org.apache.crunch.impl.mr.plan.DoNode;
+
+/**
+ * Enum that is associated with a serialized {@link DoNode} instance, so we know
+ * how to use it within the context of a particular MR job.
+ *
+ */
+public enum NodeContext {
+ MAP,
+ REDUCE,
+ COMBINE;
+
+ public String getConfigurationKey() {
+ return "crunch.donode." + toString().toLowerCase();
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java
new file mode 100644
index 0000000..ce7b795
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.run;
+
+import java.io.Serializable;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.crunch.CrunchRuntimeException;
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.impl.mr.emit.IntermediateEmitter;
+import org.apache.crunch.impl.mr.emit.MultipleOutputEmitter;
+import org.apache.crunch.impl.mr.emit.OutputEmitter;
+import org.apache.crunch.types.Converter;
+import org.apache.crunch.types.PType;
+
+public class RTNode implements Serializable {
+
+ private static final Log LOG = LogFactory.getLog(RTNode.class);
+
+ private final String nodeName;
+ private DoFn<Object, Object> fn;
+ private PType<Object> outputPType;
+ private final List<RTNode> children;
+ private final Converter inputConverter;
+ private final Converter outputConverter;
+ private final String outputName;
+
+ private transient Emitter<Object> emitter;
+
+ public RTNode(DoFn<Object, Object> fn, PType<Object> outputPType, String name, List<RTNode> children,
+ Converter inputConverter,
+ Converter outputConverter, String outputName) {
+ this.fn = fn;
+ this.outputPType = outputPType;
+ this.nodeName = name;
+ this.children = children;
+ this.inputConverter = inputConverter;
+ this.outputConverter = outputConverter;
+ this.outputName = outputName;
+ }
+
+ public void initialize(CrunchTaskContext ctxt) {
+ if (emitter != null) {
+ // Already initialized
+ return;
+ }
+
+ fn.setContext(ctxt.getContext());
+ fn.initialize();
+ for (RTNode child : children) {
+ child.initialize(ctxt);
+ }
+
+ if (outputConverter != null) {
+ if (outputName != null) {
+ this.emitter = new MultipleOutputEmitter(outputConverter, ctxt.getMultipleOutputs(),
+ outputName);
+ } else {
+ this.emitter = new OutputEmitter(outputConverter, ctxt.getContext());
+ }
+ } else if (!children.isEmpty()) {
+ this.emitter = new IntermediateEmitter(outputPType, children,
+ ctxt.getContext().getConfiguration());
+ } else {
+ throw new CrunchRuntimeException("Invalid RTNode config: no emitter for: " + nodeName);
+ }
+ }
+
+ public boolean isLeafNode() {
+ return outputConverter != null && children.isEmpty();
+ }
+
+ public void process(Object input) {
+ try {
+ fn.process(input, emitter);
+ } catch (CrunchRuntimeException e) {
+ if (!e.wasLogged()) {
+ LOG.info(String.format("Crunch exception in '%s' for input: %s", nodeName, input.toString()), e);
+ e.markLogged();
+ }
+ throw e;
+ }
+ }
+
+ public void process(Object key, Object value) {
+ process(inputConverter.convertInput(key, value));
+ }
+
+ public void processIterable(Object key, Iterable values) {
+ process(inputConverter.convertIterableInput(key, values));
+ }
+
+ public void cleanup() {
+ fn.cleanup(emitter);
+ emitter.flush();
+ for (RTNode child : children) {
+ child.cleanup();
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "RTNode [nodeName=" + nodeName + ", fn=" + fn + ", children=" + children + ", inputConverter="
+ + inputConverter + ", outputConverter=" + outputConverter + ", outputName=" + outputName + "]";
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java
new file mode 100644
index 0000000..604c49c
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.run;
+
+/**
+ * Parameters used during the runtime execution.
+ */
+public class RuntimeParameters {
+
+ public static final String AGGREGATOR_BUCKETS = "crunch.aggregator.buckets";
+
+ public static final String DEBUG = "crunch.debug";
+
+ public static final String TMP_DIR = "crunch.tmp.dir";
+
+ public static final String LOG_JOB_PROGRESS = "crunch.log.job.progress";
+
+ public static final String CREATE_DIR = "mapreduce.jobcontrol.createdir.ifnotexist";
+
+ // Not instantiated
+ private RuntimeParameters() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/io/At.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/At.java b/crunch-core/src/main/java/org/apache/crunch/io/At.java
new file mode 100644
index 0000000..a6f0782
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/io/At.java
@@ -0,0 +1,257 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io;
+
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.crunch.SourceTarget;
+import org.apache.crunch.TableSourceTarget;
+import org.apache.crunch.io.avro.AvroFileSourceTarget;
+import org.apache.crunch.io.seq.SeqFileSourceTarget;
+import org.apache.crunch.io.seq.SeqFileTableSourceTarget;
+import org.apache.crunch.io.text.TextFileSourceTarget;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.PTypeFamily;
+import org.apache.crunch.types.avro.AvroType;
+import org.apache.crunch.types.avro.Avros;
+import org.apache.crunch.types.writable.Writables;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * <p>Static factory methods for creating common {@link SourceTarget} types, which may be treated as both a {@code Source}
+ * and a {@code Target}.</p>
+ *
+ * <p>The {@code At} methods is analogous to the {@link From} and {@link To} factory methods, but is used for
+ * storing intermediate outputs that need to be passed from one run of a MapReduce pipeline to another run. The
+ * {@code SourceTarget} object acts as both a {@code Source} and a {@Target}, which enables it to provide this
+ * functionality.
+ *
+ * <code>
+ * Pipeline pipeline = new MRPipeline(this.getClass());
+ * // Create our intermediate storage location
+ * SourceTarget<String> intermediate = At.textFile("/temptext");
+ * ...
+ * // Write out the output of the first phase of a pipeline.
+ * pipeline.write(phase1, intermediate);
+ *
+ * // Explicitly call run to kick off the pipeline.
+ * pipeline.run();
+ *
+ * // And then kick off a second phase by consuming the output
+ * // from the first phase.
+ * PCollection<String> phase2Input = pipeline.read(intermediate);
+ * ...
+ * </code>
+ * </p>
+ *
+ * <p>The {@code SourceTarget} abstraction is useful when we care about reading the intermediate
+ * outputs of a pipeline as well as the final results.</p>
+ */
+public class At {
+
+ /**
+ * Creates a {@code SourceTarget<T>} instance from the Avro file(s) at the given path name.
+ *
+ * @param pathName The name of the path to the data on the filesystem
+ * @param avroClass The subclass of {@code SpecificRecord} to use for the Avro file
+ * @return A new {@code SourceTarget<T>} instance
+ */
+ public static <T extends SpecificRecord> SourceTarget<T> avroFile(String pathName, Class<T> avroClass) {
+ return avroFile(new Path(pathName), avroClass);
+ }
+
+ /**
+ * Creates a {@code SourceTarget<T>} instance from the Avro file(s) at the given {@code Path}.
+ *
+ * @param path The {@code Path} to the data
+ * @param avroClass The subclass of {@code SpecificRecord} to use for the Avro file
+ * @return A new {@code SourceTarget<T>} instance
+ */
+ public static <T extends SpecificRecord> SourceTarget<T> avroFile(Path path, Class<T> avroClass) {
+ return avroFile(path, Avros.specifics(avroClass));
+ }
+
+ /**
+ * Creates a {@code SourceTarget<T>} instance from the Avro file(s) at the given path name.
+ *
+ * @param pathName The name of the path to the data on the filesystem
+ * @param avroType The {@code AvroType} for the Avro records
+ * @return A new {@code SourceTarget<T>} instance
+ */
+ public static <T> SourceTarget<T> avroFile(String pathName, AvroType<T> avroType) {
+ return avroFile(new Path(pathName), avroType);
+ }
+
+ /**
+ * Creates a {@code SourceTarget<T>} instance from the Avro file(s) at the given {@code Path}.
+ *
+ * @param path The {@code Path} to the data
+ * @param avroType The {@code AvroType} for the Avro records
+ * @return A new {@code SourceTarget<T>} instance
+ */
+ public static <T> SourceTarget<T> avroFile(Path path, AvroType<T> avroType) {
+ return new AvroFileSourceTarget<T>(path, avroType);
+ }
+
+ /**
+ * Creates a {@code SourceTarget<T>} instance from the SequenceFile(s) at the given path name
+ * from the value field of each key-value pair in the SequenceFile(s).
+ *
+ * @param pathName The name of the path to the data on the filesystem
+ * @param valueClass The {@code Writable} type for the value of the SequenceFile entry
+ * @return A new {@code SourceTarget<T>} instance
+ */
+ public static <T extends Writable> SourceTarget<T> sequenceFile(String pathName, Class<T> valueClass) {
+ return sequenceFile(new Path(pathName), valueClass);
+ }
+
+ /**
+ * Creates a {@code SourceTarget<T>} instance from the SequenceFile(s) at the given {@code Path}
+ * from the value field of each key-value pair in the SequenceFile(s).
+ *
+ * @param path The {@code Path} to the data
+ * @param valueClass The {@code Writable} type for the value of the SequenceFile entry
+ * @return A new {@code SourceTarget<T>} instance
+ */
+ public static <T extends Writable> SourceTarget<T> sequenceFile(Path path, Class<T> valueClass) {
+ return sequenceFile(path, Writables.writables(valueClass));
+ }
+
+ /**
+ * Creates a {@code SourceTarget<T>} instance from the SequenceFile(s) at the given path name
+ * from the value field of each key-value pair in the SequenceFile(s).
+ *
+ * @param pathName The name of the path to the data on the filesystem
+ * @param ptype The {@code PType} for the value of the SequenceFile entry
+ * @return A new {@code SourceTarget<T>} instance
+ */
+ public static <T> SourceTarget<T> sequenceFile(String pathName, PType<T> ptype) {
+ return sequenceFile(new Path(pathName), ptype);
+ }
+
+ /**
+ * Creates a {@code SourceTarget<T>} instance from the SequenceFile(s) at the given {@code Path}
+ * from the value field of each key-value pair in the SequenceFile(s).
+ *
+ * @param path The {@code Path} to the data
+ * @param ptype The {@code PType} for the value of the SequenceFile entry
+ * @return A new {@code SourceTarget<T>} instance
+ */
+ public static <T> SourceTarget<T> sequenceFile(Path path, PType<T> ptype) {
+ return new SeqFileSourceTarget<T>(path, ptype);
+ }
+
+ /**
+ * Creates a {@code TableSourceTarget<K, V>} instance from the SequenceFile(s) at the given path name
+ * from the key-value pairs in the SequenceFile(s).
+ *
+ * @param pathName The name of the path to the data on the filesystem
+ * @param keyClass The {@code Writable} type for the key of the SequenceFile entry
+ * @param valueClass The {@code Writable} type for the value of the SequenceFile entry
+ * @return A new {@code TableSourceTarget<K, V>} instance
+ */
+ public static <K extends Writable, V extends Writable> TableSourceTarget<K, V> sequenceFile(
+ String pathName, Class<K> keyClass, Class<V> valueClass) {
+ return sequenceFile(new Path(pathName), keyClass, valueClass);
+ }
+
+ /**
+ * Creates a {@code TableSourceTarget<K, V>} instance from the SequenceFile(s) at the given {@code Path}
+ * from the key-value pairs in the SequenceFile(s).
+ *
+ * @param path The {@code Path} to the data
+ * @param keyClass The {@code Writable} type for the key of the SequenceFile entry
+ * @param valueClass The {@code Writable} type for the value of the SequenceFile entry
+ * @return A new {@code TableSourceTarget<K, V>} instance
+ */
+ public static <K extends Writable, V extends Writable> TableSourceTarget<K, V> sequenceFile(
+ Path path, Class<K> keyClass, Class<V> valueClass) {
+ return sequenceFile(path, Writables.writables(keyClass), Writables.writables(valueClass));
+ }
+
+ /**
+ * Creates a {@code TableSourceTarget<K, V>} instance from the SequenceFile(s) at the given path name
+ * from the key-value pairs in the SequenceFile(s).
+ *
+ * @param pathName The name of the path to the data on the filesystem
+ * @param keyType The {@code PType} for the key of the SequenceFile entry
+ * @param valueType The {@code PType} for the value of the SequenceFile entry
+ * @return A new {@code TableSourceTarget<K, V>} instance
+ */
+ public static <K, V> TableSourceTarget<K, V> sequenceFile(String pathName, PType<K> keyType, PType<V> valueType) {
+ return sequenceFile(new Path(pathName), keyType, valueType);
+ }
+
+ /**
+ * Creates a {@code TableSourceTarget<K, V>} instance from the SequenceFile(s) at the given {@code Path}
+ * from the key-value pairs in the SequenceFile(s).
+ *
+ * @param path The {@code Path} to the data
+ * @param keyType The {@code PType} for the key of the SequenceFile entry
+ * @param valueType The {@code PType} for the value of the SequenceFile entry
+ * @return A new {@code TableSourceTarget<K, V>} instance
+ */
+ public static <K, V> TableSourceTarget<K, V> sequenceFile(Path path, PType<K> keyType, PType<V> valueType) {
+ PTypeFamily ptf = keyType.getFamily();
+ return new SeqFileTableSourceTarget<K, V>(path, ptf.tableOf(keyType, valueType));
+ }
+
+ /**
+ * Creates a {@code SourceTarget<String>} instance for the text file(s) at the given path name.
+ *
+ * @param pathName The name of the path to the data on the filesystem
+ * @return A new {@code SourceTarget<String>} instance
+ */
+ public static SourceTarget<String> textFile(String pathName) {
+ return textFile(new Path(pathName));
+ }
+
+ /**
+ * Creates a {@code SourceTarget<String>} instance for the text file(s) at the given {@code Path}.
+ *
+ * @param path The {@code Path} to the data
+ * @return A new {@code SourceTarget<String>} instance
+ */
+ public static SourceTarget<String> textFile(Path path) {
+ return textFile(path, Writables.strings());
+ }
+
+ /**
+ * Creates a {@code SourceTarget<T>} instance for the text file(s) at the given path name using
+ * the provided {@code PType<T>} to convert the input text.
+ *
+ * @param pathName The name of the path to the data on the filesystem
+ * @param ptype The {@code PType<T>} to use to process the input text
+ * @return A new {@code SourceTarget<T>} instance
+ */
+ public static <T> SourceTarget<T> textFile(String pathName, PType<T> ptype) {
+ return textFile(new Path(pathName), ptype);
+ }
+
+ /**
+ * Creates a {@code SourceTarget<T>} instance for the text file(s) at the given {@code Path} using
+ * the provided {@code PType<T>} to convert the input text.
+ *
+ * @param path The {@code Path} to the data
+ * @param ptype The {@code PType<T>} to use to process the input text
+ * @return A new {@code SourceTarget<T>} instance
+ */
+ public static <T> SourceTarget<T> textFile(Path path, PType<T> ptype) {
+ return new TextFileSourceTarget<T>(path, ptype);
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/io/CompositePathIterable.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/CompositePathIterable.java b/crunch-core/src/main/java/org/apache/crunch/io/CompositePathIterable.java
new file mode 100644
index 0000000..a4723e9
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/io/CompositePathIterable.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+
+import com.google.common.collect.UnmodifiableIterator;
+
+public class CompositePathIterable<T> implements Iterable<T> {
+
+ private final FileStatus[] stati;
+ private final FileSystem fs;
+ private final FileReaderFactory<T> readerFactory;
+
+ private static final PathFilter FILTER = new PathFilter() {
+ @Override
+ public boolean accept(Path path) {
+ return !path.getName().startsWith("_");
+ }
+ };
+
+ public static <S> Iterable<S> create(FileSystem fs, Path path, FileReaderFactory<S> readerFactory) throws IOException {
+
+ if (!fs.exists(path)) {
+ throw new IOException("No files found to materialize at: " + path);
+ }
+
+ FileStatus[] stati = null;
+ try {
+ stati = fs.listStatus(path, FILTER);
+ } catch (FileNotFoundException e) {
+ stati = null;
+ }
+ if (stati == null) {
+ throw new IOException("No files found to materialize at: " + path);
+ }
+
+ if (stati.length == 0) {
+ return Collections.emptyList();
+ } else {
+ return new CompositePathIterable<S>(stati, fs, readerFactory);
+ }
+
+ }
+
+ private CompositePathIterable(FileStatus[] stati, FileSystem fs, FileReaderFactory<T> readerFactory) {
+ this.stati = stati;
+ this.fs = fs;
+ this.readerFactory = readerFactory;
+ }
+
+ @Override
+ public Iterator<T> iterator() {
+
+ return new UnmodifiableIterator<T>() {
+ private int index = 0;
+ private Iterator<T> iter = readerFactory.read(fs, stati[index++].getPath());
+
+ @Override
+ public boolean hasNext() {
+ if (!iter.hasNext()) {
+ while (index < stati.length) {
+ iter = readerFactory.read(fs, stati[index++].getPath());
+ if (iter.hasNext()) {
+ return true;
+ }
+ }
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public T next() {
+ return iter.next();
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/io/CrunchInputs.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/CrunchInputs.java b/crunch-core/src/main/java/org/apache/crunch/io/CrunchInputs.java
new file mode 100644
index 0000000..d154db2
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/io/CrunchInputs.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+/**
+ * Helper functions for configuring multiple {@code InputFormat} instances within a single
+ * Crunch MapReduce job.
+ */
+public class CrunchInputs {
+ public static final String CRUNCH_INPUTS = "crunch.inputs.dir";
+
+ private static final char RECORD_SEP = ',';
+ private static final char FIELD_SEP = ';';
+ private static final Joiner JOINER = Joiner.on(FIELD_SEP);
+ private static final Splitter SPLITTER = Splitter.on(FIELD_SEP);
+
+ public static void addInputPath(Job job, Path path, FormatBundle inputBundle, int nodeIndex) {
+ Configuration conf = job.getConfiguration();
+ String inputs = JOINER.join(inputBundle.serialize(), String.valueOf(nodeIndex), path.toString());
+ String existing = conf.get(CRUNCH_INPUTS);
+ conf.set(CRUNCH_INPUTS, existing == null ? inputs : existing + RECORD_SEP + inputs);
+ }
+
+ public static Map<FormatBundle, Map<Integer, List<Path>>> getFormatNodeMap(JobContext job) {
+ Map<FormatBundle, Map<Integer, List<Path>>> formatNodeMap = Maps.newHashMap();
+ Configuration conf = job.getConfiguration();
+ for (String input : Splitter.on(RECORD_SEP).split(conf.get(CRUNCH_INPUTS))) {
+ List<String> fields = Lists.newArrayList(SPLITTER.split(input));
+ FormatBundle<InputFormat> inputBundle = FormatBundle.fromSerialized(fields.get(0), InputFormat.class);
+ if (!formatNodeMap.containsKey(inputBundle)) {
+ formatNodeMap.put(inputBundle, Maps.<Integer, List<Path>> newHashMap());
+ }
+ Integer nodeIndex = Integer.valueOf(fields.get(1));
+ if (!formatNodeMap.get(inputBundle).containsKey(nodeIndex)) {
+ formatNodeMap.get(inputBundle).put(nodeIndex, Lists.<Path> newLinkedList());
+ }
+ formatNodeMap.get(inputBundle).get(nodeIndex).add(new Path(fields.get(2)));
+ }
+ return formatNodeMap;
+ }
+
+}