You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2013/04/23 22:41:13 UTC

[11/43] CRUNCH-196: crunch -> crunch-core rename to fix build issues

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java
deleted file mode 100644
index f22b5a1..0000000
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java
+++ /dev/null
@@ -1,245 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.impl.mr.plan;
-
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.crunch.Pipeline;
-import org.apache.crunch.Target;
-import org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchControlledJob;
-import org.apache.crunch.impl.mr.collect.DoTableImpl;
-import org.apache.crunch.impl.mr.collect.PCollectionImpl;
-import org.apache.crunch.impl.mr.collect.PGroupedTableImpl;
-import org.apache.crunch.impl.mr.exec.CrunchJobHooks;
-import org.apache.crunch.impl.mr.run.CrunchCombiner;
-import org.apache.crunch.impl.mr.run.CrunchInputFormat;
-import org.apache.crunch.impl.mr.run.CrunchMapper;
-import org.apache.crunch.impl.mr.run.CrunchReducer;
-import org.apache.crunch.impl.mr.run.NodeContext;
-import org.apache.crunch.impl.mr.run.RTNode;
-import org.apache.crunch.util.DistCache;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Job;
-
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
-class JobPrototype {
-
-  public static JobPrototype createMapReduceJob(int jobID, PGroupedTableImpl<?, ?> group,
-      Set<NodePath> inputs, Path workingPath) {
-    return new JobPrototype(jobID, inputs, group, workingPath);
-  }
-
-  public static JobPrototype createMapOnlyJob(int jobID, HashMultimap<Target, NodePath> mapNodePaths, Path workingPath) {
-    return new JobPrototype(jobID, mapNodePaths, workingPath);
-  }
-
-  private final int jobID; // TODO: maybe stageID sounds better
-  private final Set<NodePath> mapNodePaths;
-  private final PGroupedTableImpl<?, ?> group;
-  private final Set<JobPrototype> dependencies = Sets.newHashSet();
-  private final Map<PCollectionImpl<?>, DoNode> nodes = Maps.newHashMap();
-  private final Path workingPath;
-
-  private HashMultimap<Target, NodePath> targetsToNodePaths;
-  private DoTableImpl<?, ?> combineFnTable;
-
-  private CrunchControlledJob job;
-
-  private JobPrototype(int jobID, Set<NodePath> inputs, PGroupedTableImpl<?, ?> group, Path workingPath) {
-    this.jobID = jobID;
-    this.mapNodePaths = ImmutableSet.copyOf(inputs);
-    this.group = group;
-    this.workingPath = workingPath;
-    this.targetsToNodePaths = null;
-  }
-
-  private JobPrototype(int jobID, HashMultimap<Target, NodePath> outputPaths, Path workingPath) {
-    this.jobID = jobID;
-    this.group = null;
-    this.mapNodePaths = null;
-    this.workingPath = workingPath;
-    this.targetsToNodePaths = outputPaths;
-  }
-
-  public int getJobID() {
-    return jobID;
-  }
-
-  public boolean isMapOnly() {
-    return this.group == null;
-  }
-
-  Set<NodePath> getMapNodePaths() {
-    return mapNodePaths;
-  }
-
-  PGroupedTableImpl<?, ?> getGroupingTable() {
-    return group;
-  }
-
-  HashMultimap<Target, NodePath> getTargetsToNodePaths() {
-    return targetsToNodePaths;
-  }
-
-  public void addReducePaths(HashMultimap<Target, NodePath> outputPaths) {
-    if (group == null) {
-      throw new IllegalStateException("Cannot add a reduce phase to a map-only job");
-    }
-    this.targetsToNodePaths = outputPaths;
-  }
-
-  public void addDependency(JobPrototype dependency) {
-    this.dependencies.add(dependency);
-  }
-
-  public CrunchControlledJob getCrunchJob(Class<?> jarClass, Configuration conf, Pipeline pipeline) throws IOException {
-    if (job == null) {
-      job = build(jarClass, conf, pipeline);
-      for (JobPrototype proto : dependencies) {
-        job.addDependingJob(proto.getCrunchJob(jarClass, conf, pipeline));
-      }
-    }
-    return job;
-  }
-
-  private CrunchControlledJob build(Class<?> jarClass, Configuration conf, Pipeline pipeline) throws IOException {
-    Job job = new Job(conf);
-    conf = job.getConfiguration();
-    conf.set(PlanningParameters.CRUNCH_WORKING_DIRECTORY, workingPath.toString());
-    job.setJarByClass(jarClass);
-
-    Set<DoNode> outputNodes = Sets.newHashSet();
-    Set<Target> targets = targetsToNodePaths.keySet();
-    Path outputPath = new Path(workingPath, "output");
-    MSCROutputHandler outputHandler = new MSCROutputHandler(job, outputPath, group == null);
-    for (Target target : targets) {
-      DoNode node = null;
-      for (NodePath nodePath : targetsToNodePaths.get(target)) {
-        if (node == null) {
-          PCollectionImpl<?> collect = nodePath.tail();
-          node = DoNode.createOutputNode(target.toString(), collect.getPType());
-          outputHandler.configureNode(node, target);
-        }
-        outputNodes.add(walkPath(nodePath.descendingIterator(), node));
-      }
-    }
-
-    job.setMapperClass(CrunchMapper.class);
-    List<DoNode> inputNodes;
-    DoNode reduceNode = null;
-    if (group != null) {
-      job.setReducerClass(CrunchReducer.class);
-      List<DoNode> reduceNodes = Lists.newArrayList(outputNodes);
-      serialize(reduceNodes, conf, workingPath, NodeContext.REDUCE);
-      reduceNode = reduceNodes.get(0);
-
-      if (combineFnTable != null) {
-        job.setCombinerClass(CrunchCombiner.class);
-        DoNode combinerInputNode = group.createDoNode();
-        DoNode combineNode = combineFnTable.createDoNode();
-        combineNode.addChild(group.getGroupingNode());
-        combinerInputNode.addChild(combineNode);
-        serialize(ImmutableList.of(combinerInputNode), conf, workingPath, NodeContext.COMBINE);
-      }
-
-      group.configureShuffle(job);
-
-      DoNode mapOutputNode = group.getGroupingNode();
-      Set<DoNode> mapNodes = Sets.newHashSet();
-      for (NodePath nodePath : mapNodePaths) {
-        // Advance these one step, since we've already configured
-        // the grouping node, and the PGroupedTableImpl is the tail
-        // of the NodePath.
-        Iterator<PCollectionImpl<?>> iter = nodePath.descendingIterator();
-        iter.next();
-        mapNodes.add(walkPath(iter, mapOutputNode));
-      }
-      inputNodes = Lists.newArrayList(mapNodes);
-    } else { // No grouping
-      job.setNumReduceTasks(0);
-      inputNodes = Lists.newArrayList(outputNodes);
-    }
-    serialize(inputNodes, conf, workingPath, NodeContext.MAP);
-
-    if (inputNodes.size() == 1) {
-      DoNode inputNode = inputNodes.get(0);
-      inputNode.getSource().configureSource(job, -1);
-    } else {
-      for (int i = 0; i < inputNodes.size(); i++) {
-        DoNode inputNode = inputNodes.get(i);
-        inputNode.getSource().configureSource(job, i);
-      }
-      job.setInputFormatClass(CrunchInputFormat.class);
-    }
-    job.setJobName(createJobName(pipeline.getName(), inputNodes, reduceNode));
-
-    return new CrunchControlledJob(
-        jobID,
-        job,
-        new CrunchJobHooks.PrepareHook(job),
-        new CrunchJobHooks.CompletionHook(job, outputPath, outputHandler.getMultiPaths(), group == null));
-  }
-
-  private void serialize(List<DoNode> nodes, Configuration conf, Path workingPath, NodeContext context)
-      throws IOException {
-    List<RTNode> rtNodes = Lists.newArrayList();
-    for (DoNode node : nodes) {
-      rtNodes.add(node.toRTNode(true, conf, context));
-    }
-    Path path = new Path(workingPath, context.toString());
-    DistCache.write(conf, path, rtNodes);
-  }
-
-  private String createJobName(String pipelineName, List<DoNode> mapNodes, DoNode reduceNode) {
-    JobNameBuilder builder = new JobNameBuilder(pipelineName);
-    builder.visit(mapNodes);
-    if (reduceNode != null) {
-      builder.visit(reduceNode);
-    }
-    return builder.build();
-  }
-
-  private DoNode walkPath(Iterator<PCollectionImpl<?>> iter, DoNode working) {
-    while (iter.hasNext()) {
-      PCollectionImpl<?> collect = iter.next();
-      if (combineFnTable != null && !(collect instanceof PGroupedTableImpl)) {
-        combineFnTable = null;
-      } else if (collect instanceof DoTableImpl && ((DoTableImpl<?, ?>) collect).hasCombineFn()) {
-        combineFnTable = (DoTableImpl<?, ?>) collect;
-      }
-      if (!nodes.containsKey(collect)) {
-        nodes.put(collect, collect.createDoNode());
-      }
-      DoNode parent = nodes.get(collect);
-      parent.addChild(working);
-      working = parent;
-    }
-    return working;
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCROutputHandler.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCROutputHandler.java b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCROutputHandler.java
deleted file mode 100644
index 36c565e..0000000
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCROutputHandler.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.impl.mr.plan;
-
-import java.util.Map;
-
-import org.apache.crunch.Target;
-import org.apache.crunch.io.MapReduceTarget;
-import org.apache.crunch.io.OutputHandler;
-import org.apache.crunch.io.PathTarget;
-import org.apache.crunch.types.PType;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Job;
-
-import com.google.common.collect.Maps;
-
-public class MSCROutputHandler implements OutputHandler {
-
-  private final Job job;
-  private final Path path;
-  private final boolean mapOnlyJob;
-
-  private DoNode workingNode;
-  private Map<Integer, PathTarget> multiPaths;
-  private int jobCount;
-
-  public MSCROutputHandler(Job job, Path outputPath, boolean mapOnlyJob) {
-    this.job = job;
-    this.path = outputPath;
-    this.mapOnlyJob = mapOnlyJob;
-    this.multiPaths = Maps.newHashMap();
-  }
-
-  public void configureNode(DoNode node, Target target) {
-    workingNode = node;
-    target.accept(this, node.getPType());
-  }
-
-  public boolean configure(Target target, PType<?> ptype) {
-    if (target instanceof MapReduceTarget) {
-      if (target instanceof PathTarget) {
-        multiPaths.put(jobCount, (PathTarget) target);
-      }
-
-      String name = PlanningParameters.MULTI_OUTPUT_PREFIX + jobCount;
-      jobCount++;
-      workingNode.setOutputName(name);
-      ((MapReduceTarget) target).configureForMapReduce(job, ptype, path, name);
-      return true;
-    }
-
-    return false;
-  }
-
-  public boolean isMapOnlyJob() {
-    return mapOnlyJob;
-  }
-
-  public Map<Integer, PathTarget> getMultiPaths() {
-    return multiPaths;
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
deleted file mode 100644
index 3e1de38..0000000
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
+++ /dev/null
@@ -1,378 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.impl.mr.plan;
-
-import java.io.IOException;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.TreeMap;
-
-import org.apache.crunch.SourceTarget;
-import org.apache.crunch.Target;
-import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.impl.mr.collect.InputCollection;
-import org.apache.crunch.impl.mr.collect.PCollectionImpl;
-import org.apache.crunch.impl.mr.collect.PGroupedTableImpl;
-import org.apache.crunch.impl.mr.exec.MRExecutor;
-import org.apache.crunch.materialize.MaterializableIterable;
-import org.apache.hadoop.conf.Configuration;
-
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.Sets;
-
-public class MSCRPlanner {
-
-  private final MRPipeline pipeline;
-  private final Map<PCollectionImpl<?>, Set<Target>> outputs;
-  private final Map<PCollectionImpl<?>, MaterializableIterable> toMaterialize;
-  private int lastJobID = 0;
-
-  public MSCRPlanner(MRPipeline pipeline, Map<PCollectionImpl<?>, Set<Target>> outputs,
-      Map<PCollectionImpl<?>, MaterializableIterable> toMaterialize) {
-    this.pipeline = pipeline;
-    this.outputs = new TreeMap<PCollectionImpl<?>, Set<Target>>(DEPTH_COMPARATOR);
-    this.outputs.putAll(outputs);
-    this.toMaterialize = toMaterialize;
-  }
-
-  // Used to ensure that we always build pipelines starting from the deepest
-  // outputs, which helps ensure that we handle intermediate outputs correctly.
-  private static final Comparator<PCollectionImpl<?>> DEPTH_COMPARATOR = new Comparator<PCollectionImpl<?>>() {
-    @Override
-    public int compare(PCollectionImpl<?> left, PCollectionImpl<?> right) {
-      int cmp = right.getDepth() - left.getDepth();
-      if (cmp == 0) {
-        // Ensure we don't throw away two output collections at the same depth.
-        // Using the collection name would be nicer here, but names aren't
-        // necessarily unique.
-        cmp = new Integer(right.hashCode()).compareTo(left.hashCode());
-      }
-      return cmp;
-    }
-  };  
-
-  public MRExecutor plan(Class<?> jarClass, Configuration conf) throws IOException {
-    Map<PCollectionImpl<?>, Set<SourceTarget<?>>> targetDeps = Maps.newTreeMap(DEPTH_COMPARATOR);
-    for (PCollectionImpl<?> pcollect : outputs.keySet()) {
-      targetDeps.put(pcollect, pcollect.getTargetDependencies());
-    }
-    
-    Multimap<Vertex, JobPrototype> assignments = HashMultimap.create();
-    Multimap<PCollectionImpl<?>, Vertex> protoDependency = HashMultimap.create();
-    while (!targetDeps.isEmpty()) {
-      Set<Target> allTargets = Sets.newHashSet();
-      for (PCollectionImpl<?> pcollect : targetDeps.keySet()) {
-        allTargets.addAll(outputs.get(pcollect));
-      }
-      GraphBuilder graphBuilder = new GraphBuilder();
-      
-      // Walk the current plan tree and build a graph in which the vertices are
-      // sources, targets, and GBK operations.
-      Set<PCollectionImpl<?>> currentStage = Sets.newHashSet();
-      Set<PCollectionImpl<?>> laterStage = Sets.newHashSet();
-      for (PCollectionImpl<?> output : targetDeps.keySet()) {
-        if (Sets.intersection(allTargets, targetDeps.get(output)).isEmpty()) {
-          graphBuilder.visitOutput(output);
-          currentStage.add(output);
-        } else {
-          laterStage.add(output);
-        }
-      }
-      
-      Graph baseGraph = graphBuilder.getGraph();
-      
-      // Create a new graph that splits up up dependent GBK nodes.
-      Graph graph = prepareFinalGraph(baseGraph);
-      
-      // Break the graph up into connected components.
-      List<List<Vertex>> components = graph.connectedComponents();
-      
-      // For each component, we will create one or more job prototypes,
-      // depending on its profile.
-      // For dependency handling, we only need to care about which
-      // job prototype a particular GBK is assigned to.
-      for (List<Vertex> component : components) {
-        assignments.putAll(constructJobPrototypes(component));
-      }
-
-      // Add in the job dependency information here.
-      for (Map.Entry<Vertex, JobPrototype> e : assignments.entries()) {
-        JobPrototype current = e.getValue();
-        List<Vertex> parents = graph.getParents(e.getKey());
-        for (Vertex parent : parents) {
-          for (JobPrototype parentJobProto : assignments.get(parent)) {
-            current.addDependency(parentJobProto);
-          }
-        }
-      }
-      
-      // Add cross-stage dependencies.
-      for (PCollectionImpl<?> output : currentStage) {
-        Set<Target> targets = outputs.get(output);
-        Vertex vertex = graph.getVertexAt(output);
-        for (PCollectionImpl<?> later : laterStage) {
-          if (!Sets.intersection(targets, targetDeps.get(later)).isEmpty()) {
-            protoDependency.put(later, vertex);
-          }
-        }
-        targetDeps.remove(output);
-      }
-    }
-    
-    // Cross-job dependencies.
-    for (Entry<PCollectionImpl<?>, Vertex> pd : protoDependency.entries()) {
-      Vertex d = new Vertex(pd.getKey());
-      Vertex dj = pd.getValue();
-      for (JobPrototype parent : assignments.get(dj)) {
-        for (JobPrototype child : assignments.get(d)) {
-          child.addDependency(parent);
-        }
-      }
-    }
-    
-    // Finally, construct the jobs from the prototypes and return.
-    DotfileWriter dotfileWriter = new DotfileWriter();
-    MRExecutor exec = new MRExecutor(jarClass, outputs, toMaterialize);
-    for (JobPrototype proto : Sets.newHashSet(assignments.values())) {
-      dotfileWriter.addJobPrototype(proto);
-      exec.addJob(proto.getCrunchJob(jarClass, conf, pipeline));
-    }
-
-    String planDotFile = dotfileWriter.buildDotfile();
-    exec.setPlanDotFile(planDotFile);
-    conf.set(PlanningParameters.PIPELINE_PLAN_DOTFILE, planDotFile);
-
-    return exec;
-  }
-  
-  private Graph prepareFinalGraph(Graph baseGraph) {
-    Graph graph = new Graph();
-    
-    for (Vertex baseVertex : baseGraph) {
-      // Add all of the vertices in the base graph, but no edges (yet).
-      graph.addVertex(baseVertex.getPCollection(), baseVertex.isOutput());
-    }
-    
-    for (Edge e : baseGraph.getAllEdges()) {
-      // Add back all of the edges where neither vertex is a GBK and we do not
-      // have an output feeding into a GBK.
-      if (!(e.getHead().isGBK() && e.getTail().isGBK()) &&
-          !(e.getHead().isOutput() && e.getTail().isGBK())) {
-        Vertex head = graph.getVertexAt(e.getHead().getPCollection());
-        Vertex tail = graph.getVertexAt(e.getTail().getPCollection());
-        graph.getEdge(head, tail).addAllNodePaths(e.getNodePaths());
-      }
-    }
-    
-    for (Vertex baseVertex : baseGraph) {
-      if (baseVertex.isGBK()) {
-        Vertex vertex = graph.getVertexAt(baseVertex.getPCollection());
-        for (Edge e : baseVertex.getIncomingEdges()) {
-          if (e.getHead().isOutput()) {
-            // Execute an edge split.
-            Vertex splitTail = e.getHead();
-            PCollectionImpl<?> split = splitTail.getPCollection();
-            InputCollection<?> inputNode = handleSplitTarget(split);
-            Vertex splitHead = graph.addVertex(inputNode, false);
-            
-            // Divide up the node paths in the edge between the two GBK nodes so
-            // that each node is either owned by GBK1 -> newTail or newHead -> GBK2.
-            for (NodePath path : e.getNodePaths()) {
-              NodePath headPath = path.splitAt(split, splitHead.getPCollection());
-              graph.getEdge(vertex, splitTail).addNodePath(headPath);
-              graph.getEdge(splitHead, vertex).addNodePath(path);
-            }
-            
-            // Note the dependency between the vertices in the graph.
-            graph.markDependency(splitHead, splitTail);
-          } else if (!e.getHead().isGBK()) {
-            Vertex newHead = graph.getVertexAt(e.getHead().getPCollection());
-            graph.getEdge(newHead, vertex).addAllNodePaths(e.getNodePaths());
-          }
-        }
-        for (Edge e : baseVertex.getOutgoingEdges()) {
-          if (!e.getTail().isGBK()) {
-            Vertex newTail = graph.getVertexAt(e.getTail().getPCollection());
-            graph.getEdge(vertex, newTail).addAllNodePaths(e.getNodePaths());
-          } else {
-            // Execute an Edge split
-            Vertex newGraphTail = graph.getVertexAt(e.getTail().getPCollection());
-            PCollectionImpl split = e.getSplit();
-            InputCollection<?> inputNode = handleSplitTarget(split);
-            Vertex splitTail = graph.addVertex(split, true);
-            Vertex splitHead = graph.addVertex(inputNode, false);
-            
-            // Divide up the node paths in the edge between the two GBK nodes so
-            // that each node is either owned by GBK1 -> newTail or newHead -> GBK2.
-            for (NodePath path : e.getNodePaths()) {
-              NodePath headPath = path.splitAt(split, splitHead.getPCollection());
-              graph.getEdge(vertex, splitTail).addNodePath(headPath);
-              graph.getEdge(splitHead, newGraphTail).addNodePath(path);
-            }
-            
-            // Note the dependency between the vertices in the graph.
-            graph.markDependency(splitHead, splitTail);
-          }
-        }
-      }
-    }
-    
-    return graph;
-  }
-  
-  private Multimap<Vertex, JobPrototype> constructJobPrototypes(List<Vertex> component) {
-    Multimap<Vertex, JobPrototype> assignment = HashMultimap.create();
-    List<Vertex> gbks = Lists.newArrayList();
-    for (Vertex v : component) {
-      if (v.isGBK()) {
-        gbks.add(v);
-      }
-    }
-
-    if (gbks.isEmpty()) {
-      HashMultimap<Target, NodePath> outputPaths = HashMultimap.create();
-      for (Vertex v : component) {
-        if (v.isInput()) {
-          for (Edge e : v.getOutgoingEdges()) {
-            for (NodePath nodePath : e.getNodePaths()) {
-              PCollectionImpl target = nodePath.tail();
-              for (Target t : outputs.get(target)) {
-                outputPaths.put(t, nodePath);
-              }
-            }
-          }
-        }
-      }
-      if (outputPaths.isEmpty()) {
-        throw new IllegalStateException("No outputs?");
-      }
-      JobPrototype prototype = JobPrototype.createMapOnlyJob(
-          ++lastJobID, outputPaths, pipeline.createTempPath());
-      for (Vertex v : component) {
-        assignment.put(v, prototype);
-      }
-    } else {
-      Set<Edge> usedEdges = Sets.newHashSet();
-      for (Vertex g : gbks) {
-        Set<NodePath> inputs = Sets.newHashSet();
-        for (Edge e : g.getIncomingEdges()) {
-          inputs.addAll(e.getNodePaths());
-          usedEdges.add(e);
-        }
-        JobPrototype prototype = JobPrototype.createMapReduceJob(
-            ++lastJobID, (PGroupedTableImpl) g.getPCollection(), inputs, pipeline.createTempPath());
-        assignment.put(g, prototype);
-        for (Edge e : g.getIncomingEdges()) {
-          assignment.put(e.getHead(), prototype);
-          usedEdges.add(e);
-        }
-        HashMultimap<Target, NodePath> outputPaths = HashMultimap.create();
-        for (Edge e : g.getOutgoingEdges()) {
-          Vertex output = e.getTail();
-          for (Target t : outputs.get(output.getPCollection())) {
-            outputPaths.putAll(t, e.getNodePaths());
-          }
-          assignment.put(output, prototype);
-          usedEdges.add(e);
-        }
-        prototype.addReducePaths(outputPaths);
-      }
-      
-      // Check for any un-assigned vertices, which should be map-side outputs
-      // that we will need to run in a map-only job.
-      HashMultimap<Target, NodePath> outputPaths = HashMultimap.create();
-      Set<Vertex> orphans = Sets.newHashSet();
-      for (Vertex v : component) {
-
-        // Check if this vertex has multiple inputs but only a subset of
-        // them have already been assigned
-        boolean vertexHasUnassignedIncomingEdges = false;
-        if (v.isOutput()) {
-          for (Edge e : v.getIncomingEdges()) {
-            if (!usedEdges.contains(e)) {
-              vertexHasUnassignedIncomingEdges = true;
-            }
-          }
-        }
-
-        if (v.isOutput() && (vertexHasUnassignedIncomingEdges || !assignment.containsKey(v))) {
-          orphans.add(v);
-          for (Edge e : v.getIncomingEdges()) {
-            if (vertexHasUnassignedIncomingEdges && usedEdges.contains(e)) {
-              // We've already dealt with this incoming edge
-              continue;
-            }
-            orphans.add(e.getHead());
-            for (NodePath nodePath : e.getNodePaths()) {
-              PCollectionImpl target = nodePath.tail();
-              for (Target t : outputs.get(target)) {
-                outputPaths.put(t, nodePath);
-              }
-            }
-          }
-        }
-
-      }
-      if (!outputPaths.isEmpty()) {
-        JobPrototype prototype = JobPrototype.createMapOnlyJob(
-            ++lastJobID, outputPaths, pipeline.createTempPath());
-        for (Vertex orphan : orphans) {
-          assignment.put(orphan, prototype);
-        }
-      }
-    }
-    
-    return assignment;
-  }
-  
-  private InputCollection<?> handleSplitTarget(PCollectionImpl<?> splitTarget) {
-    if (!outputs.containsKey(splitTarget)) {
-      outputs.put(splitTarget, Sets.<Target> newHashSet());
-    }
-
-    SourceTarget srcTarget = null;
-    Target targetToReplace = null;
-    for (Target t : outputs.get(splitTarget)) {
-      if (t instanceof SourceTarget) {
-        srcTarget = (SourceTarget<?>) t;
-        break;
-      } else {
-        srcTarget = t.asSourceTarget(splitTarget.getPType());
-        if (srcTarget != null) {
-          targetToReplace = t;
-          break;
-        }
-      }
-    }
-    if (targetToReplace != null) {
-      outputs.get(splitTarget).remove(targetToReplace);
-    } else if (srcTarget == null) {
-      srcTarget = pipeline.createIntermediateOutput(splitTarget.getPType());
-    }
-    outputs.get(splitTarget).add(srcTarget);
-    splitTarget.materializeAt(srcTarget);
-
-    return (InputCollection<?>) pipeline.read(srcTarget);
-  }  
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/impl/mr/plan/NodePath.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/NodePath.java b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/NodePath.java
deleted file mode 100644
index a090d93..0000000
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/NodePath.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.impl.mr.plan;
-
-import java.util.Iterator;
-import java.util.LinkedList;
-
-import org.apache.crunch.impl.mr.collect.PCollectionImpl;
-
-import com.google.common.collect.Lists;
-
-class NodePath implements Iterable<PCollectionImpl<?>> {
-  private LinkedList<PCollectionImpl<?>> path;
-
-  public NodePath() {
-    this.path = Lists.newLinkedList();
-  }
-
-  public NodePath(PCollectionImpl<?> tail) {
-    this.path = Lists.newLinkedList();
-    this.path.add(tail);
-  }
-
-  public NodePath(NodePath other) {
-    this.path = Lists.newLinkedList(other.path);
-  }
-
-  public void push(PCollectionImpl<?> stage) {
-    this.path.push((PCollectionImpl<?>) stage);
-  }
-
-  public NodePath close(PCollectionImpl<?> head) {
-    this.path.push(head);
-    return this;
-  }
-
-  public Iterator<PCollectionImpl<?>> iterator() {
-    return path.iterator();
-  }
-
-  public Iterator<PCollectionImpl<?>> descendingIterator() {
-    return path.descendingIterator();
-  }
-
-  public PCollectionImpl<?> get(int index) {
-    return path.get(index);
-  }
-
-  public PCollectionImpl<?> head() {
-    return path.peekFirst();
-  }
-
-  public PCollectionImpl<?> tail() {
-    return path.peekLast();
-  }
-
-  @Override
-  public boolean equals(Object other) {
-    if (other == null || !(other instanceof NodePath)) {
-      return false;
-    }
-    NodePath nodePath = (NodePath) other;
-    return path.equals(nodePath.path);
-  }
-
-  @Override
-  public int hashCode() {
-    return 17 + 37 * path.hashCode();
-  }
-
-  @Override
-  public String toString() {
-    StringBuilder sb = new StringBuilder();
-    for (PCollectionImpl<?> collect : path) {
-      sb.append(collect.getName() + "|");
-    }
-    sb.deleteCharAt(sb.length() - 1);
-    return sb.toString();
-  }
-
-  public NodePath splitAt(int splitIndex, PCollectionImpl<?> newHead) {
-    NodePath top = new NodePath();
-    for (int i = 0; i <= splitIndex; i++) {
-      top.path.add(path.get(i));
-    }
-    LinkedList<PCollectionImpl<?>> nextPath = Lists.newLinkedList();
-    nextPath.add(newHead);
-    nextPath.addAll(path.subList(splitIndex + 1, path.size()));
-    path = nextPath;
-    return top;
-  }
-  
-  public NodePath splitAt(PCollectionImpl split, PCollectionImpl<?> newHead) {
-    NodePath top = new NodePath();
-    int splitIndex = 0;
-    for (PCollectionImpl p : path) {
-      top.path.add(p);
-      if (p == split) {
-        break;
-      }
-      splitIndex++;
-    }
-    LinkedList<PCollectionImpl<?>> nextPath = Lists.newLinkedList();
-    nextPath.add(newHead);
-    nextPath.addAll(path.subList(splitIndex + 1, path.size()));
-    path = nextPath;
-    return top;
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/impl/mr/plan/PlanningParameters.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/PlanningParameters.java b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/PlanningParameters.java
deleted file mode 100644
index b90a911..0000000
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/PlanningParameters.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.impl.mr.plan;
-
-/**
- * Collection of Configuration keys and various constants used when planning MapReduce jobs for a
- * pipeline.
- */
-public class PlanningParameters {
-
-  public static final String MULTI_OUTPUT_PREFIX = "out";
-
-  public static final String CRUNCH_WORKING_DIRECTORY = "crunch.work.dir";
-
-  /**
-   * Configuration key under which a <a href="http://www.graphviz.org">DOT</a> file containing the
-   * pipeline job graph is stored by the planner.
-   */
-  public static final String PIPELINE_PLAN_DOTFILE = "crunch.planner.dotfile";
-
-  private PlanningParameters() {
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/impl/mr/plan/Vertex.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/Vertex.java b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/Vertex.java
deleted file mode 100644
index f4aa668..0000000
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/Vertex.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.impl.mr.plan;
-
-import java.util.List;
-import java.util.Set;
-
-import org.apache.commons.lang.builder.ReflectionToStringBuilder;
-import org.apache.commons.lang.builder.ToStringStyle;
-import org.apache.crunch.Source;
-import org.apache.crunch.impl.mr.collect.InputCollection;
-import org.apache.crunch.impl.mr.collect.PCollectionImpl;
-import org.apache.crunch.impl.mr.collect.PGroupedTableImpl;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
-/**
- *
- */
-class Vertex {
-  private final PCollectionImpl impl;
-  
-  private boolean output;
-  private Set<Edge> incoming;
-  private Set<Edge> outgoing;
-  
-  public Vertex(PCollectionImpl impl) {
-    this.impl = impl;
-    this.incoming = Sets.newHashSet();
-    this.outgoing = Sets.newHashSet();
-  }
-  
-  public PCollectionImpl getPCollection() {
-    return impl;
-  }
-  
-  public boolean isInput() {
-    return impl instanceof InputCollection;
-  }
-  
-  public boolean isGBK() {
-    return impl instanceof PGroupedTableImpl;
-  }
-  
-  public void setOutput() {
-    this.output = true;
-  }
-  
-  public boolean isOutput() {
-    return output;
-  }
-  
-  public Source getSource() {
-    if (isInput()) {
-      return ((InputCollection) impl).getSource();
-    }
-    return null;
-  }
-  
-  public void addIncoming(Edge edge) {
-    this.incoming.add(edge);
-  }
-  
-  public void addOutgoing(Edge edge) {
-    this.outgoing.add(edge);
-  }
-  
-  public List<Vertex> getAllNeighbors() {
-    List<Vertex> n = Lists.newArrayList();
-    for (Edge e : incoming) {
-      n.add(e.getHead());
-    }
-    for (Edge e : outgoing) {
-      n.add(e.getTail());
-    }
-    return n;
-  }
-  
-  public Set<Edge> getAllEdges() {
-    return Sets.union(incoming, outgoing);
-  }
-  
-  public Set<Edge> getIncomingEdges() {
-    return incoming;
-  }
-  
-  public Set<Edge> getOutgoingEdges() {
-    return outgoing;
-  }
-  
-  @Override
-  public boolean equals(Object obj) {
-    if (obj == null || !(obj instanceof Vertex)) {
-      return false;
-    }
-    Vertex other = (Vertex) obj;
-    return impl.equals(other.impl);
-  }
-  
-  @Override
-  public int hashCode() {
-    return 17 + 37 * impl.hashCode();
-  }
-
-  @Override
-  public String toString() {
-    return new ReflectionToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE).setExcludeFieldNames(
-        new String[] { "outgoing", "incoming" }).toString();
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchCombiner.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchCombiner.java b/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchCombiner.java
deleted file mode 100644
index 47a3ded..0000000
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchCombiner.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.impl.mr.run;
-
-public class CrunchCombiner extends CrunchReducer {
-
-  @Override
-  protected NodeContext getNodeContext() {
-    return NodeContext.COMBINE;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputFormat.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputFormat.java b/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputFormat.java
deleted file mode 100644
index eb5dd8a..0000000
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputFormat.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.impl.mr.run;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.crunch.io.CrunchInputs;
-import org.apache.crunch.io.FormatBundle;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.util.ReflectionUtils;
-
-import com.google.common.collect.Lists;
-
-public class CrunchInputFormat<K, V> extends InputFormat<K, V> {
-
-  @Override
-  public List<InputSplit> getSplits(JobContext job) throws IOException, InterruptedException {
-    List<InputSplit> splits = Lists.newArrayList();
-    Configuration base = job.getConfiguration();
-    Map<FormatBundle, Map<Integer, List<Path>>> formatNodeMap = CrunchInputs.getFormatNodeMap(job);
-
-    // First, build a map of InputFormats to Paths
-    for (Map.Entry<FormatBundle, Map<Integer, List<Path>>> entry : formatNodeMap.entrySet()) {
-      FormatBundle inputBundle = entry.getKey();
-      Configuration conf = new Configuration(base);
-      inputBundle.configure(conf);
-      Job jobCopy = new Job(conf);
-      InputFormat<?, ?> format = (InputFormat<?, ?>) ReflectionUtils.newInstance(inputBundle.getFormatClass(),
-          jobCopy.getConfiguration());
-      for (Map.Entry<Integer, List<Path>> nodeEntry : entry.getValue().entrySet()) {
-        Integer nodeIndex = nodeEntry.getKey();
-        List<Path> paths = nodeEntry.getValue();
-        FileInputFormat.setInputPaths(jobCopy, paths.toArray(new Path[paths.size()]));
-
-        // Get splits for each input path and tag with InputFormat
-        // and Mapper types by wrapping in a TaggedInputSplit.
-        List<InputSplit> pathSplits = format.getSplits(jobCopy);
-        for (InputSplit pathSplit : pathSplits) {
-          splits.add(new CrunchInputSplit(pathSplit, inputBundle.getFormatClass(),
-              nodeIndex, jobCopy.getConfiguration()));
-        }
-      }
-    }
-    return splits;
-  }
-
-  @Override
-  public RecordReader<K, V> createRecordReader(InputSplit inputSplit, TaskAttemptContext context) throws IOException,
-      InterruptedException {
-    return new CrunchRecordReader<K, V>(inputSplit, context);
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputSplit.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputSplit.java b/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputSplit.java
deleted file mode 100644
index b41062b..0000000
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputSplit.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.impl.mr.run;
-
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.serializer.Deserializer;
-import org.apache.hadoop.io.serializer.SerializationFactory;
-import org.apache.hadoop.io.serializer.Serializer;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.util.ReflectionUtils;
-
-class CrunchInputSplit extends InputSplit implements Writable {
-
-  private InputSplit inputSplit;
-  private Class<? extends InputFormat<?, ?>> inputFormatClass;
-  private int nodeIndex;
-  private Configuration conf;
-
-  public CrunchInputSplit() {
-    // default constructor
-  }
-
-  public CrunchInputSplit(
-      InputSplit inputSplit,
-      Class<? extends InputFormat<?, ?>> inputFormatClass,
-      int nodeIndex,
-      Configuration conf) {
-    this.inputSplit = inputSplit;
-    this.inputFormatClass = inputFormatClass;
-    this.nodeIndex = nodeIndex;
-    this.conf = conf;
-  }
-
-  public Configuration getConf() {
-    return conf;
-  }
-  
-  public int getNodeIndex() {
-    return nodeIndex;
-  }
-
-  public InputSplit getInputSplit() {
-    return inputSplit;
-  }
-
-  public Class<? extends InputFormat<?, ?>> getInputFormatClass() {
-    return inputFormatClass;
-  }
-
-  @Override
-  public long getLength() throws IOException, InterruptedException {
-    return inputSplit.getLength();
-  }
-
-  @Override
-  public String[] getLocations() throws IOException, InterruptedException {
-    return inputSplit.getLocations();
-  }
-
-  public void readFields(DataInput in) throws IOException {
-    nodeIndex = in.readInt();
-    conf = new Configuration();
-    conf.readFields(in);
-    inputFormatClass = (Class<? extends InputFormat<?, ?>>) readClass(in);
-    Class<? extends InputSplit> inputSplitClass = (Class<? extends InputSplit>) readClass(in);
-    inputSplit = (InputSplit) ReflectionUtils.newInstance(inputSplitClass, conf);
-    SerializationFactory factory = new SerializationFactory(conf);
-    Deserializer deserializer = factory.getDeserializer(inputSplitClass);
-    deserializer.open((DataInputStream) in);
-    inputSplit = (InputSplit) deserializer.deserialize(inputSplit);
-  }
-
-  private Class<?> readClass(DataInput in) throws IOException {
-    String className = Text.readString(in);
-    try {
-      return conf.getClassByName(className);
-    } catch (ClassNotFoundException e) {
-      throw new RuntimeException("readObject can't find class", e);
-    }
-  }
-
-  public void write(DataOutput out) throws IOException {
-    out.writeInt(nodeIndex);
-    conf.write(out);
-    Text.writeString(out, inputFormatClass.getName());
-    Text.writeString(out, inputSplit.getClass().getName());
-    SerializationFactory factory = new SerializationFactory(conf);
-    Serializer serializer = factory.getSerializer(inputSplit.getClass());
-    serializer.open((DataOutputStream) out);
-    serializer.serialize(inputSplit);
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchMapper.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchMapper.java b/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchMapper.java
deleted file mode 100644
index 70f0b01..0000000
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchMapper.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.impl.mr.run;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.crunch.CrunchRuntimeException;
-import org.apache.hadoop.mapreduce.Mapper;
-
-public class CrunchMapper extends Mapper<Object, Object, Object, Object> {
-
-  private static final Log LOG = LogFactory.getLog(CrunchMapper.class);
-
-  private RTNode node;
-  private CrunchTaskContext ctxt;
-  private boolean debug;
-
-  @Override
-  protected void setup(Mapper<Object, Object, Object, Object>.Context context) {
-    List<RTNode> nodes;
-    this.ctxt = new CrunchTaskContext(context, NodeContext.MAP);
-    try {
-      nodes = ctxt.getNodes();
-    } catch (IOException e) {
-      LOG.info("Crunch deserialization error", e);
-      throw new CrunchRuntimeException(e);
-    }
-    if (nodes.size() == 1) {
-      this.node = nodes.get(0);
-    } else {
-      CrunchInputSplit split = (CrunchInputSplit) context.getInputSplit();
-      this.node = nodes.get(split.getNodeIndex());
-    }
-    this.debug = ctxt.isDebugRun();
-  }
-
-  @Override
-  protected void map(Object k, Object v, Mapper<Object, Object, Object, Object>.Context context) {
-    if (debug) {
-      try {
-        node.process(k, v);
-      } catch (Exception e) {
-        LOG.error("Mapper exception", e);
-      }
-    } else {
-      node.process(k, v);
-    }
-  }
-
-  @Override
-  protected void cleanup(Mapper<Object, Object, Object, Object>.Context context) {
-    node.cleanup();
-    ctxt.cleanup();
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchRecordReader.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchRecordReader.java b/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchRecordReader.java
deleted file mode 100644
index fc8fb32..0000000
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchRecordReader.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.impl.mr.run;
-
-import java.io.IOException;
-
-import org.apache.crunch.hadoop.mapreduce.TaskAttemptContextFactory;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.util.ReflectionUtils;
-
-class CrunchRecordReader<K, V> extends RecordReader<K, V> {
-
-  private final RecordReader<K, V> delegate;
-
-  public CrunchRecordReader(InputSplit inputSplit, final TaskAttemptContext context) throws IOException,
-      InterruptedException {
-    CrunchInputSplit crunchSplit = (CrunchInputSplit) inputSplit;
-    InputFormat<K, V> inputFormat = (InputFormat<K, V>) ReflectionUtils.newInstance(crunchSplit.getInputFormatClass(),
-        crunchSplit.getConf());
-    this.delegate = inputFormat.createRecordReader(crunchSplit.getInputSplit(),
-        TaskAttemptContextFactory.create(crunchSplit.getConf(), context.getTaskAttemptID()));
-  }
-
-  @Override
-  public void close() throws IOException {
-    delegate.close();
-  }
-
-  @Override
-  public K getCurrentKey() throws IOException, InterruptedException {
-    return delegate.getCurrentKey();
-  }
-
-  @Override
-  public V getCurrentValue() throws IOException, InterruptedException {
-    return delegate.getCurrentValue();
-  }
-
-  @Override
-  public float getProgress() throws IOException, InterruptedException {
-    return delegate.getProgress();
-  }
-
-  @Override
-  public void initialize(InputSplit inputSplit, TaskAttemptContext context) throws IOException, InterruptedException {
-    CrunchInputSplit crunchSplit = (CrunchInputSplit) inputSplit;
-    InputSplit delegateSplit = crunchSplit.getInputSplit();
-    delegate.initialize(delegateSplit,
-        TaskAttemptContextFactory.create(crunchSplit.getConf(), context.getTaskAttemptID()));
-  }
-
-  @Override
-  public boolean nextKeyValue() throws IOException, InterruptedException {
-    return delegate.nextKeyValue();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchReducer.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchReducer.java b/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchReducer.java
deleted file mode 100644
index e5ddbd2..0000000
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchReducer.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.impl.mr.run;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.crunch.CrunchRuntimeException;
-import org.apache.crunch.impl.SingleUseIterable;
-import org.apache.hadoop.mapreduce.Reducer;
-
-public class CrunchReducer extends Reducer<Object, Object, Object, Object> {
-
-  private static final Log LOG = LogFactory.getLog(CrunchReducer.class);
-
-  private RTNode node;
-  private CrunchTaskContext ctxt;
-  private boolean debug;
-
-  protected NodeContext getNodeContext() {
-    return NodeContext.REDUCE;
-  }
-
-  @Override
-  protected void setup(Reducer<Object, Object, Object, Object>.Context context) {
-    this.ctxt = new CrunchTaskContext(context, getNodeContext());
-    try {
-      List<RTNode> nodes = ctxt.getNodes();
-      this.node = nodes.get(0);
-    } catch (IOException e) {
-      LOG.info("Crunch deserialization error", e);
-      throw new CrunchRuntimeException(e);
-    }
-    this.debug = ctxt.isDebugRun();
-  }
-
-  @Override
-  protected void reduce(Object key, Iterable<Object> values, Reducer<Object, Object, Object, Object>.Context context) {
-    values = new SingleUseIterable<Object>(values);
-    if (debug) {
-      try {
-        node.processIterable(key, values);
-      } catch (Exception e) {
-        LOG.error("Reducer exception", e);
-      }
-    } else {
-      node.processIterable(key, values);
-    }
-  }
-
-  @Override
-  protected void cleanup(Reducer<Object, Object, Object, Object>.Context context) {
-    node.cleanup();
-    ctxt.cleanup();
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchTaskContext.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchTaskContext.java b/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchTaskContext.java
deleted file mode 100644
index c4f2873..0000000
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchTaskContext.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.impl.mr.run;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.crunch.CrunchRuntimeException;
-import org.apache.crunch.impl.mr.plan.PlanningParameters;
-import org.apache.crunch.io.CrunchOutputs;
-import org.apache.crunch.util.DistCache;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.TaskInputOutputContext;
-
-class CrunchTaskContext {
-
-  private final TaskInputOutputContext<Object, Object, Object, Object> taskContext;
-  private final NodeContext nodeContext;
-  private CrunchOutputs<Object, Object> multipleOutputs;
-
-  public CrunchTaskContext(TaskInputOutputContext<Object, Object, Object, Object> taskContext, NodeContext nodeContext) {
-    this.taskContext = taskContext;
-    this.nodeContext = nodeContext;
-  }
-
-  public TaskInputOutputContext<Object, Object, Object, Object> getContext() {
-    return taskContext;
-  }
-
-  public NodeContext getNodeContext() {
-    return nodeContext;
-  }
-
-  public List<RTNode> getNodes() throws IOException {
-    Configuration conf = taskContext.getConfiguration();
-    Path path = new Path(new Path(conf.get(PlanningParameters.CRUNCH_WORKING_DIRECTORY)), nodeContext.toString());
-    @SuppressWarnings("unchecked")
-    List<RTNode> nodes = (List<RTNode>) DistCache.read(conf, path);
-    if (nodes != null) {
-      for (RTNode node : nodes) {
-        node.initialize(this);
-      }
-    }
-    return nodes;
-  }
-
-  public boolean isDebugRun() {
-    Configuration conf = taskContext.getConfiguration();
-    return conf.getBoolean(RuntimeParameters.DEBUG, false);
-  }
-
-  public void cleanup() {
-    if (multipleOutputs != null) {
-      try {
-        multipleOutputs.close();
-      } catch (IOException e) {
-        throw new CrunchRuntimeException(e);
-      } catch (InterruptedException e) {
-        throw new CrunchRuntimeException(e);
-      }
-    }
-  }
-
-  public CrunchOutputs<Object, Object> getMultipleOutputs() {
-    if (multipleOutputs == null) {
-      multipleOutputs = new CrunchOutputs<Object, Object>(taskContext);
-    }
-    return multipleOutputs;
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/impl/mr/run/NodeContext.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/run/NodeContext.java b/crunch/src/main/java/org/apache/crunch/impl/mr/run/NodeContext.java
deleted file mode 100644
index ffc9e7c..0000000
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/run/NodeContext.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.impl.mr.run;
-
-import org.apache.crunch.impl.mr.plan.DoNode;
-
-/**
- * Enum that is associated with a serialized {@link DoNode} instance, so we know
- * how to use it within the context of a particular MR job.
- * 
- */
-public enum NodeContext {
-  MAP,
-  REDUCE,
-  COMBINE;
-
-  public String getConfigurationKey() {
-    return "crunch.donode." + toString().toLowerCase();
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java b/crunch/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java
deleted file mode 100644
index ce7b795..0000000
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.impl.mr.run;
-
-import java.io.Serializable;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.crunch.CrunchRuntimeException;
-import org.apache.crunch.DoFn;
-import org.apache.crunch.Emitter;
-import org.apache.crunch.impl.mr.emit.IntermediateEmitter;
-import org.apache.crunch.impl.mr.emit.MultipleOutputEmitter;
-import org.apache.crunch.impl.mr.emit.OutputEmitter;
-import org.apache.crunch.types.Converter;
-import org.apache.crunch.types.PType;
-
-public class RTNode implements Serializable {
-
-  private static final Log LOG = LogFactory.getLog(RTNode.class);
-
-  private final String nodeName;
-  private DoFn<Object, Object> fn;
-  private PType<Object> outputPType;
-  private final List<RTNode> children;
-  private final Converter inputConverter;
-  private final Converter outputConverter;
-  private final String outputName;
-
-  private transient Emitter<Object> emitter;
-
-  public RTNode(DoFn<Object, Object> fn, PType<Object> outputPType, String name, List<RTNode> children,
-      Converter inputConverter,
-      Converter outputConverter, String outputName) {
-    this.fn = fn;
-    this.outputPType = outputPType;
-    this.nodeName = name;
-    this.children = children;
-    this.inputConverter = inputConverter;
-    this.outputConverter = outputConverter;
-    this.outputName = outputName;
-  }
-
-  public void initialize(CrunchTaskContext ctxt) {
-    if (emitter != null) {
-      // Already initialized
-      return;
-    }
-
-    fn.setContext(ctxt.getContext());
-    fn.initialize();
-    for (RTNode child : children) {
-      child.initialize(ctxt);
-    }
-
-    if (outputConverter != null) {
-      if (outputName != null) {
-        this.emitter = new MultipleOutputEmitter(outputConverter, ctxt.getMultipleOutputs(),
-            outputName);
-      } else {
-        this.emitter = new OutputEmitter(outputConverter, ctxt.getContext());
-      }
-    } else if (!children.isEmpty()) {
-      this.emitter = new IntermediateEmitter(outputPType, children,
-          ctxt.getContext().getConfiguration());
-    } else {
-      throw new CrunchRuntimeException("Invalid RTNode config: no emitter for: " + nodeName);
-    }
-  }
-
-  public boolean isLeafNode() {
-    return outputConverter != null && children.isEmpty();
-  }
-
-  public void process(Object input) {
-    try {
-      fn.process(input, emitter);
-    } catch (CrunchRuntimeException e) {
-      if (!e.wasLogged()) {
-        LOG.info(String.format("Crunch exception in '%s' for input: %s", nodeName, input.toString()), e);
-        e.markLogged();
-      }
-      throw e;
-    }
-  }
-
-  public void process(Object key, Object value) {
-    process(inputConverter.convertInput(key, value));
-  }
-
-  public void processIterable(Object key, Iterable values) {
-    process(inputConverter.convertIterableInput(key, values));
-  }
-
-  public void cleanup() {
-    fn.cleanup(emitter);
-    emitter.flush();
-    for (RTNode child : children) {
-      child.cleanup();
-    }
-  }
-
-  @Override
-  public String toString() {
-    return "RTNode [nodeName=" + nodeName + ", fn=" + fn + ", children=" + children + ", inputConverter="
-        + inputConverter + ", outputConverter=" + outputConverter + ", outputName=" + outputName + "]";
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java b/crunch/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java
deleted file mode 100644
index 604c49c..0000000
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.impl.mr.run;
-
-/**
- * Parameters used during the runtime execution.
- */
-public class RuntimeParameters {
-
-  public static final String AGGREGATOR_BUCKETS = "crunch.aggregator.buckets";
-
-  public static final String DEBUG = "crunch.debug";
-
-  public static final String TMP_DIR = "crunch.tmp.dir";
-
-  public static final String LOG_JOB_PROGRESS = "crunch.log.job.progress";
-
-  public static final String CREATE_DIR = "mapreduce.jobcontrol.createdir.ifnotexist";
-
-  // Not instantiated
-  private RuntimeParameters() {
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/io/At.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/At.java b/crunch/src/main/java/org/apache/crunch/io/At.java
deleted file mode 100644
index a6f0782..0000000
--- a/crunch/src/main/java/org/apache/crunch/io/At.java
+++ /dev/null
@@ -1,257 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.io;
-
-import org.apache.avro.specific.SpecificRecord;
-import org.apache.crunch.SourceTarget;
-import org.apache.crunch.TableSourceTarget;
-import org.apache.crunch.io.avro.AvroFileSourceTarget;
-import org.apache.crunch.io.seq.SeqFileSourceTarget;
-import org.apache.crunch.io.seq.SeqFileTableSourceTarget;
-import org.apache.crunch.io.text.TextFileSourceTarget;
-import org.apache.crunch.types.PType;
-import org.apache.crunch.types.PTypeFamily;
-import org.apache.crunch.types.avro.AvroType;
-import org.apache.crunch.types.avro.Avros;
-import org.apache.crunch.types.writable.Writables;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Writable;
-
-/**
- * <p>Static factory methods for creating common {@link SourceTarget} types, which may be treated as both a {@code Source}
- * and a {@code Target}.</p>
- * 
- * <p>The {@code At} methods is analogous to the {@link From} and {@link To} factory methods, but is used for
- * storing intermediate outputs that need to be passed from one run of a MapReduce pipeline to another run. The
- * {@code SourceTarget} object acts as both a {@code Source} and a {@Target}, which enables it to provide this
- * functionality.
- * 
- * <code>
- *   Pipeline pipeline = new MRPipeline(this.getClass());
- *   // Create our intermediate storage location
- *   SourceTarget<String> intermediate = At.textFile("/temptext");
- *   ...
- *   // Write out the output of the first phase of a pipeline.
- *   pipeline.write(phase1, intermediate);
- *   
- *   // Explicitly call run to kick off the pipeline.
- *   pipeline.run();
- *   
- *   // And then kick off a second phase by consuming the output
- *   // from the first phase.
- *   PCollection<String> phase2Input = pipeline.read(intermediate);
- *   ...
- * </code>
- * </p>
- * 
- * <p>The {@code SourceTarget} abstraction is useful when we care about reading the intermediate
- * outputs of a pipeline as well as the final results.</p>
- */
-public class At {
-
-  /**
-   * Creates a {@code SourceTarget<T>} instance from the Avro file(s) at the given path name.
-   * 
-   * @param pathName The name of the path to the data on the filesystem
-   * @param avroClass The subclass of {@code SpecificRecord} to use for the Avro file
-   * @return A new {@code SourceTarget<T>} instance
-   */
-  public static <T extends SpecificRecord> SourceTarget<T> avroFile(String pathName, Class<T> avroClass) {
-    return avroFile(new Path(pathName), avroClass);  
-  }
-
-  /**
-   * Creates a {@code SourceTarget<T>} instance from the Avro file(s) at the given {@code Path}.
-   * 
-   * @param path The {@code Path} to the data
-   * @param avroClass The subclass of {@code SpecificRecord} to use for the Avro file
-   * @return A new {@code SourceTarget<T>} instance
-   */
-  public static <T extends SpecificRecord> SourceTarget<T> avroFile(Path path, Class<T> avroClass) {
-    return avroFile(path, Avros.specifics(avroClass));  
-  }
-  
-  /**
-   * Creates a {@code SourceTarget<T>} instance from the Avro file(s) at the given path name.
-   * 
-   * @param pathName The name of the path to the data on the filesystem
-   * @param avroType The {@code AvroType} for the Avro records
-   * @return A new {@code SourceTarget<T>} instance
-   */
-  public static <T> SourceTarget<T> avroFile(String pathName, AvroType<T> avroType) {
-    return avroFile(new Path(pathName), avroType);
-  }
-
-  /**
-   * Creates a {@code SourceTarget<T>} instance from the Avro file(s) at the given {@code Path}.
-   * 
-   * @param path The {@code Path} to the data
-   * @param avroType The {@code AvroType} for the Avro records
-   * @return A new {@code SourceTarget<T>} instance
-   */
-  public static <T> SourceTarget<T> avroFile(Path path, AvroType<T> avroType) {
-    return new AvroFileSourceTarget<T>(path, avroType);
-  }
-
-  /**
-   * Creates a {@code SourceTarget<T>} instance from the SequenceFile(s) at the given path name
-   * from the value field of each key-value pair in the SequenceFile(s).
-   * 
-   * @param pathName The name of the path to the data on the filesystem
-   * @param valueClass The {@code Writable} type for the value of the SequenceFile entry
-   * @return A new {@code SourceTarget<T>} instance
-   */
-  public static <T extends Writable> SourceTarget<T> sequenceFile(String pathName, Class<T> valueClass) {
-    return sequenceFile(new Path(pathName), valueClass);
-  }
-
-  /**
-   * Creates a {@code SourceTarget<T>} instance from the SequenceFile(s) at the given {@code Path}
-   * from the value field of each key-value pair in the SequenceFile(s).
-   * 
-   * @param path The {@code Path} to the data
-   * @param valueClass The {@code Writable} type for the value of the SequenceFile entry
-   * @return A new {@code SourceTarget<T>} instance
-   */
-  public static <T extends Writable> SourceTarget<T> sequenceFile(Path path, Class<T> valueClass) {
-    return sequenceFile(path, Writables.writables(valueClass));
-  }
-  
-  /**
-   * Creates a {@code SourceTarget<T>} instance from the SequenceFile(s) at the given path name
-   * from the value field of each key-value pair in the SequenceFile(s).
-   * 
-   * @param pathName The name of the path to the data on the filesystem
-   * @param ptype The {@code PType} for the value of the SequenceFile entry
-   * @return A new {@code SourceTarget<T>} instance
-   */
-  public static <T> SourceTarget<T> sequenceFile(String pathName, PType<T> ptype) {
-    return sequenceFile(new Path(pathName), ptype);
-  }
-
-  /**
-   * Creates a {@code SourceTarget<T>} instance from the SequenceFile(s) at the given {@code Path}
-   * from the value field of each key-value pair in the SequenceFile(s).
-   * 
-   * @param path The {@code Path} to the data
-   * @param ptype The {@code PType} for the value of the SequenceFile entry
-   * @return A new {@code SourceTarget<T>} instance
-   */
-  public static <T> SourceTarget<T> sequenceFile(Path path, PType<T> ptype) {
-    return new SeqFileSourceTarget<T>(path, ptype);
-  }
-
-  /**
-   * Creates a {@code TableSourceTarget<K, V>} instance from the SequenceFile(s) at the given path name
-   * from the key-value pairs in the SequenceFile(s).
-   * 
-   * @param pathName The name of the path to the data on the filesystem
-   * @param keyClass The {@code Writable} type for the key of the SequenceFile entry
-   * @param valueClass The {@code Writable} type for the value of the SequenceFile entry
-   * @return A new {@code TableSourceTarget<K, V>} instance
-   */
-  public static <K extends Writable, V extends Writable> TableSourceTarget<K, V> sequenceFile(
-      String pathName, Class<K> keyClass, Class<V> valueClass) {
-    return sequenceFile(new Path(pathName), keyClass, valueClass);
-  }
-
-  /**
-   * Creates a {@code TableSourceTarget<K, V>} instance from the SequenceFile(s) at the given {@code Path}
-   * from the key-value pairs in the SequenceFile(s).
-   * 
-   * @param path The {@code Path} to the data
-   * @param keyClass The {@code Writable} type for the key of the SequenceFile entry
-   * @param valueClass The {@code Writable} type for the value of the SequenceFile entry
-   * @return A new {@code TableSourceTarget<K, V>} instance
-   */
-  public static <K extends Writable, V extends Writable> TableSourceTarget<K, V> sequenceFile(
-      Path path, Class<K> keyClass, Class<V> valueClass) {
-    return sequenceFile(path, Writables.writables(keyClass), Writables.writables(valueClass));
-  }
-  
-  /**
-   * Creates a {@code TableSourceTarget<K, V>} instance from the SequenceFile(s) at the given path name
-   * from the key-value pairs in the SequenceFile(s).
-   * 
-   * @param pathName The name of the path to the data on the filesystem
-   * @param keyType The {@code PType} for the key of the SequenceFile entry
-   * @param valueType The {@code PType} for the value of the SequenceFile entry
-   * @return A new {@code TableSourceTarget<K, V>} instance
-   */
-  public static <K, V> TableSourceTarget<K, V> sequenceFile(String pathName, PType<K> keyType, PType<V> valueType) {
-    return sequenceFile(new Path(pathName), keyType, valueType);
-  }
-
-  /**
-   * Creates a {@code TableSourceTarget<K, V>} instance from the SequenceFile(s) at the given {@code Path}
-   * from the key-value pairs in the SequenceFile(s).
-   * 
-   * @param path The {@code Path} to the data
-   * @param keyType The {@code PType} for the key of the SequenceFile entry
-   * @param valueType The {@code PType} for the value of the SequenceFile entry
-   * @return A new {@code TableSourceTarget<K, V>} instance
-   */
-  public static <K, V> TableSourceTarget<K, V> sequenceFile(Path path, PType<K> keyType, PType<V> valueType) {
-    PTypeFamily ptf = keyType.getFamily();
-    return new SeqFileTableSourceTarget<K, V>(path, ptf.tableOf(keyType, valueType));
-  }
-
-  /**
-   * Creates a {@code SourceTarget<String>} instance for the text file(s) at the given path name.
-   * 
-   * @param pathName The name of the path to the data on the filesystem
-   * @return A new {@code SourceTarget<String>} instance
-   */
-  public static SourceTarget<String> textFile(String pathName) {
-    return textFile(new Path(pathName));
-  }
-
-  /**
-   * Creates a {@code SourceTarget<String>} instance for the text file(s) at the given {@code Path}.
-   * 
-   * @param path The {@code Path} to the data
-   * @return A new {@code SourceTarget<String>} instance
-   */
-  public static SourceTarget<String> textFile(Path path) {
-    return textFile(path, Writables.strings());
-  }
-
-  /**
-   * Creates a {@code SourceTarget<T>} instance for the text file(s) at the given path name using
-   * the provided {@code PType<T>} to convert the input text.
-   * 
-   * @param pathName The name of the path to the data on the filesystem
-   * @param ptype The {@code PType<T>} to use to process the input text
-   * @return A new {@code SourceTarget<T>} instance
-   */
-  public static <T> SourceTarget<T> textFile(String pathName, PType<T> ptype) {
-    return textFile(new Path(pathName), ptype);
-  }
-
-  /**
-   * Creates a {@code SourceTarget<T>} instance for the text file(s) at the given {@code Path} using
-   * the provided {@code PType<T>} to convert the input text.
-   * 
-   * @param path The {@code Path} to the data
-   * @param ptype The {@code PType<T>} to use to process the input text
-   * @return A new {@code SourceTarget<T>} instance
-   */
-  public static <T> SourceTarget<T> textFile(Path path, PType<T> ptype) {
-    return new TextFileSourceTarget<T>(path, ptype);
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/io/CompositePathIterable.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/CompositePathIterable.java b/crunch/src/main/java/org/apache/crunch/io/CompositePathIterable.java
deleted file mode 100644
index a4723e9..0000000
--- a/crunch/src/main/java/org/apache/crunch/io/CompositePathIterable.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.io;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.Collections;
-import java.util.Iterator;
-
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
-
-import com.google.common.collect.UnmodifiableIterator;
-
-public class CompositePathIterable<T> implements Iterable<T> {
-
-  private final FileStatus[] stati;
-  private final FileSystem fs;
-  private final FileReaderFactory<T> readerFactory;
-
-  private static final PathFilter FILTER = new PathFilter() {
-    @Override
-    public boolean accept(Path path) {
-      return !path.getName().startsWith("_");
-    }
-  };
-
-  public static <S> Iterable<S> create(FileSystem fs, Path path, FileReaderFactory<S> readerFactory) throws IOException {
-
-    if (!fs.exists(path)) {
-      throw new IOException("No files found to materialize at: " + path);
-    }
-
-    FileStatus[] stati = null;
-    try {
-      stati = fs.listStatus(path, FILTER);
-    } catch (FileNotFoundException e) {
-      stati = null;
-    }
-    if (stati == null) {
-      throw new IOException("No files found to materialize at: " + path);
-    }
-
-    if (stati.length == 0) {
-      return Collections.emptyList();
-    } else {
-      return new CompositePathIterable<S>(stati, fs, readerFactory);
-    }
-
-  }
-
-  private CompositePathIterable(FileStatus[] stati, FileSystem fs, FileReaderFactory<T> readerFactory) {
-    this.stati = stati;
-    this.fs = fs;
-    this.readerFactory = readerFactory;
-  }
-
-  @Override
-  public Iterator<T> iterator() {
-
-    return new UnmodifiableIterator<T>() {
-      private int index = 0;
-      private Iterator<T> iter = readerFactory.read(fs, stati[index++].getPath());
-
-      @Override
-      public boolean hasNext() {
-        if (!iter.hasNext()) {
-          while (index < stati.length) {
-            iter = readerFactory.read(fs, stati[index++].getPath());
-            if (iter.hasNext()) {
-              return true;
-            }
-          }
-          return false;
-        }
-        return true;
-      }
-
-      @Override
-      public T next() {
-        return iter.next();
-      }
-    };
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/io/CrunchInputs.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/CrunchInputs.java b/crunch/src/main/java/org/apache/crunch/io/CrunchInputs.java
deleted file mode 100644
index d154db2..0000000
--- a/crunch/src/main/java/org/apache/crunch/io/CrunchInputs.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.io;
-
-import java.util.List;
-import java.util.Map;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-
-import com.google.common.base.Joiner;
-import com.google.common.base.Splitter;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-/**
- * Helper functions for configuring multiple {@code InputFormat} instances within a single
- * Crunch MapReduce job.
- */
-public class CrunchInputs {
-  public static final String CRUNCH_INPUTS = "crunch.inputs.dir";
-
-  private static final char RECORD_SEP = ',';
-  private static final char FIELD_SEP = ';';
-  private static final Joiner JOINER = Joiner.on(FIELD_SEP);
-  private static final Splitter SPLITTER = Splitter.on(FIELD_SEP);
-
-  public static void addInputPath(Job job, Path path, FormatBundle inputBundle, int nodeIndex) {
-    Configuration conf = job.getConfiguration();
-    String inputs = JOINER.join(inputBundle.serialize(), String.valueOf(nodeIndex), path.toString());
-    String existing = conf.get(CRUNCH_INPUTS);
-    conf.set(CRUNCH_INPUTS, existing == null ? inputs : existing + RECORD_SEP + inputs);
-  }
-
-  public static Map<FormatBundle, Map<Integer, List<Path>>> getFormatNodeMap(JobContext job) {
-    Map<FormatBundle, Map<Integer, List<Path>>> formatNodeMap = Maps.newHashMap();
-    Configuration conf = job.getConfiguration();
-    for (String input : Splitter.on(RECORD_SEP).split(conf.get(CRUNCH_INPUTS))) {
-      List<String> fields = Lists.newArrayList(SPLITTER.split(input));
-      FormatBundle<InputFormat> inputBundle = FormatBundle.fromSerialized(fields.get(0), InputFormat.class);
-      if (!formatNodeMap.containsKey(inputBundle)) {
-        formatNodeMap.put(inputBundle, Maps.<Integer, List<Path>> newHashMap());
-      }
-      Integer nodeIndex = Integer.valueOf(fields.get(1));
-      if (!formatNodeMap.get(inputBundle).containsKey(nodeIndex)) {
-        formatNodeMap.get(inputBundle).put(nodeIndex, Lists.<Path> newLinkedList());
-      }
-      formatNodeMap.get(inputBundle).get(nodeIndex).add(new Path(fields.get(2)));
-    }
-    return formatNodeMap;
-  }
-
-}