You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2013/04/23 22:41:13 UTC
[11/43] CRUNCH-196: crunch -> crunch-core rename to fix build issues
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java
deleted file mode 100644
index f22b5a1..0000000
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java
+++ /dev/null
@@ -1,245 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.impl.mr.plan;
-
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.crunch.Pipeline;
-import org.apache.crunch.Target;
-import org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchControlledJob;
-import org.apache.crunch.impl.mr.collect.DoTableImpl;
-import org.apache.crunch.impl.mr.collect.PCollectionImpl;
-import org.apache.crunch.impl.mr.collect.PGroupedTableImpl;
-import org.apache.crunch.impl.mr.exec.CrunchJobHooks;
-import org.apache.crunch.impl.mr.run.CrunchCombiner;
-import org.apache.crunch.impl.mr.run.CrunchInputFormat;
-import org.apache.crunch.impl.mr.run.CrunchMapper;
-import org.apache.crunch.impl.mr.run.CrunchReducer;
-import org.apache.crunch.impl.mr.run.NodeContext;
-import org.apache.crunch.impl.mr.run.RTNode;
-import org.apache.crunch.util.DistCache;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Job;
-
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
-class JobPrototype {
-
- public static JobPrototype createMapReduceJob(int jobID, PGroupedTableImpl<?, ?> group,
- Set<NodePath> inputs, Path workingPath) {
- return new JobPrototype(jobID, inputs, group, workingPath);
- }
-
- public static JobPrototype createMapOnlyJob(int jobID, HashMultimap<Target, NodePath> mapNodePaths, Path workingPath) {
- return new JobPrototype(jobID, mapNodePaths, workingPath);
- }
-
- private final int jobID; // TODO: maybe stageID sounds better
- private final Set<NodePath> mapNodePaths;
- private final PGroupedTableImpl<?, ?> group;
- private final Set<JobPrototype> dependencies = Sets.newHashSet();
- private final Map<PCollectionImpl<?>, DoNode> nodes = Maps.newHashMap();
- private final Path workingPath;
-
- private HashMultimap<Target, NodePath> targetsToNodePaths;
- private DoTableImpl<?, ?> combineFnTable;
-
- private CrunchControlledJob job;
-
- private JobPrototype(int jobID, Set<NodePath> inputs, PGroupedTableImpl<?, ?> group, Path workingPath) {
- this.jobID = jobID;
- this.mapNodePaths = ImmutableSet.copyOf(inputs);
- this.group = group;
- this.workingPath = workingPath;
- this.targetsToNodePaths = null;
- }
-
- private JobPrototype(int jobID, HashMultimap<Target, NodePath> outputPaths, Path workingPath) {
- this.jobID = jobID;
- this.group = null;
- this.mapNodePaths = null;
- this.workingPath = workingPath;
- this.targetsToNodePaths = outputPaths;
- }
-
- public int getJobID() {
- return jobID;
- }
-
- public boolean isMapOnly() {
- return this.group == null;
- }
-
- Set<NodePath> getMapNodePaths() {
- return mapNodePaths;
- }
-
- PGroupedTableImpl<?, ?> getGroupingTable() {
- return group;
- }
-
- HashMultimap<Target, NodePath> getTargetsToNodePaths() {
- return targetsToNodePaths;
- }
-
- public void addReducePaths(HashMultimap<Target, NodePath> outputPaths) {
- if (group == null) {
- throw new IllegalStateException("Cannot add a reduce phase to a map-only job");
- }
- this.targetsToNodePaths = outputPaths;
- }
-
- public void addDependency(JobPrototype dependency) {
- this.dependencies.add(dependency);
- }
-
- public CrunchControlledJob getCrunchJob(Class<?> jarClass, Configuration conf, Pipeline pipeline) throws IOException {
- if (job == null) {
- job = build(jarClass, conf, pipeline);
- for (JobPrototype proto : dependencies) {
- job.addDependingJob(proto.getCrunchJob(jarClass, conf, pipeline));
- }
- }
- return job;
- }
-
- private CrunchControlledJob build(Class<?> jarClass, Configuration conf, Pipeline pipeline) throws IOException {
- Job job = new Job(conf);
- conf = job.getConfiguration();
- conf.set(PlanningParameters.CRUNCH_WORKING_DIRECTORY, workingPath.toString());
- job.setJarByClass(jarClass);
-
- Set<DoNode> outputNodes = Sets.newHashSet();
- Set<Target> targets = targetsToNodePaths.keySet();
- Path outputPath = new Path(workingPath, "output");
- MSCROutputHandler outputHandler = new MSCROutputHandler(job, outputPath, group == null);
- for (Target target : targets) {
- DoNode node = null;
- for (NodePath nodePath : targetsToNodePaths.get(target)) {
- if (node == null) {
- PCollectionImpl<?> collect = nodePath.tail();
- node = DoNode.createOutputNode(target.toString(), collect.getPType());
- outputHandler.configureNode(node, target);
- }
- outputNodes.add(walkPath(nodePath.descendingIterator(), node));
- }
- }
-
- job.setMapperClass(CrunchMapper.class);
- List<DoNode> inputNodes;
- DoNode reduceNode = null;
- if (group != null) {
- job.setReducerClass(CrunchReducer.class);
- List<DoNode> reduceNodes = Lists.newArrayList(outputNodes);
- serialize(reduceNodes, conf, workingPath, NodeContext.REDUCE);
- reduceNode = reduceNodes.get(0);
-
- if (combineFnTable != null) {
- job.setCombinerClass(CrunchCombiner.class);
- DoNode combinerInputNode = group.createDoNode();
- DoNode combineNode = combineFnTable.createDoNode();
- combineNode.addChild(group.getGroupingNode());
- combinerInputNode.addChild(combineNode);
- serialize(ImmutableList.of(combinerInputNode), conf, workingPath, NodeContext.COMBINE);
- }
-
- group.configureShuffle(job);
-
- DoNode mapOutputNode = group.getGroupingNode();
- Set<DoNode> mapNodes = Sets.newHashSet();
- for (NodePath nodePath : mapNodePaths) {
- // Advance these one step, since we've already configured
- // the grouping node, and the PGroupedTableImpl is the tail
- // of the NodePath.
- Iterator<PCollectionImpl<?>> iter = nodePath.descendingIterator();
- iter.next();
- mapNodes.add(walkPath(iter, mapOutputNode));
- }
- inputNodes = Lists.newArrayList(mapNodes);
- } else { // No grouping
- job.setNumReduceTasks(0);
- inputNodes = Lists.newArrayList(outputNodes);
- }
- serialize(inputNodes, conf, workingPath, NodeContext.MAP);
-
- if (inputNodes.size() == 1) {
- DoNode inputNode = inputNodes.get(0);
- inputNode.getSource().configureSource(job, -1);
- } else {
- for (int i = 0; i < inputNodes.size(); i++) {
- DoNode inputNode = inputNodes.get(i);
- inputNode.getSource().configureSource(job, i);
- }
- job.setInputFormatClass(CrunchInputFormat.class);
- }
- job.setJobName(createJobName(pipeline.getName(), inputNodes, reduceNode));
-
- return new CrunchControlledJob(
- jobID,
- job,
- new CrunchJobHooks.PrepareHook(job),
- new CrunchJobHooks.CompletionHook(job, outputPath, outputHandler.getMultiPaths(), group == null));
- }
-
- private void serialize(List<DoNode> nodes, Configuration conf, Path workingPath, NodeContext context)
- throws IOException {
- List<RTNode> rtNodes = Lists.newArrayList();
- for (DoNode node : nodes) {
- rtNodes.add(node.toRTNode(true, conf, context));
- }
- Path path = new Path(workingPath, context.toString());
- DistCache.write(conf, path, rtNodes);
- }
-
- private String createJobName(String pipelineName, List<DoNode> mapNodes, DoNode reduceNode) {
- JobNameBuilder builder = new JobNameBuilder(pipelineName);
- builder.visit(mapNodes);
- if (reduceNode != null) {
- builder.visit(reduceNode);
- }
- return builder.build();
- }
-
- private DoNode walkPath(Iterator<PCollectionImpl<?>> iter, DoNode working) {
- while (iter.hasNext()) {
- PCollectionImpl<?> collect = iter.next();
- if (combineFnTable != null && !(collect instanceof PGroupedTableImpl)) {
- combineFnTable = null;
- } else if (collect instanceof DoTableImpl && ((DoTableImpl<?, ?>) collect).hasCombineFn()) {
- combineFnTable = (DoTableImpl<?, ?>) collect;
- }
- if (!nodes.containsKey(collect)) {
- nodes.put(collect, collect.createDoNode());
- }
- DoNode parent = nodes.get(collect);
- parent.addChild(working);
- working = parent;
- }
- return working;
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCROutputHandler.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCROutputHandler.java b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCROutputHandler.java
deleted file mode 100644
index 36c565e..0000000
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCROutputHandler.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.impl.mr.plan;
-
-import java.util.Map;
-
-import org.apache.crunch.Target;
-import org.apache.crunch.io.MapReduceTarget;
-import org.apache.crunch.io.OutputHandler;
-import org.apache.crunch.io.PathTarget;
-import org.apache.crunch.types.PType;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Job;
-
-import com.google.common.collect.Maps;
-
-public class MSCROutputHandler implements OutputHandler {
-
- private final Job job;
- private final Path path;
- private final boolean mapOnlyJob;
-
- private DoNode workingNode;
- private Map<Integer, PathTarget> multiPaths;
- private int jobCount;
-
- public MSCROutputHandler(Job job, Path outputPath, boolean mapOnlyJob) {
- this.job = job;
- this.path = outputPath;
- this.mapOnlyJob = mapOnlyJob;
- this.multiPaths = Maps.newHashMap();
- }
-
- public void configureNode(DoNode node, Target target) {
- workingNode = node;
- target.accept(this, node.getPType());
- }
-
- public boolean configure(Target target, PType<?> ptype) {
- if (target instanceof MapReduceTarget) {
- if (target instanceof PathTarget) {
- multiPaths.put(jobCount, (PathTarget) target);
- }
-
- String name = PlanningParameters.MULTI_OUTPUT_PREFIX + jobCount;
- jobCount++;
- workingNode.setOutputName(name);
- ((MapReduceTarget) target).configureForMapReduce(job, ptype, path, name);
- return true;
- }
-
- return false;
- }
-
- public boolean isMapOnlyJob() {
- return mapOnlyJob;
- }
-
- public Map<Integer, PathTarget> getMultiPaths() {
- return multiPaths;
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
deleted file mode 100644
index 3e1de38..0000000
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
+++ /dev/null
@@ -1,378 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.impl.mr.plan;
-
-import java.io.IOException;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.TreeMap;
-
-import org.apache.crunch.SourceTarget;
-import org.apache.crunch.Target;
-import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.impl.mr.collect.InputCollection;
-import org.apache.crunch.impl.mr.collect.PCollectionImpl;
-import org.apache.crunch.impl.mr.collect.PGroupedTableImpl;
-import org.apache.crunch.impl.mr.exec.MRExecutor;
-import org.apache.crunch.materialize.MaterializableIterable;
-import org.apache.hadoop.conf.Configuration;
-
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.Sets;
-
-public class MSCRPlanner {
-
- private final MRPipeline pipeline;
- private final Map<PCollectionImpl<?>, Set<Target>> outputs;
- private final Map<PCollectionImpl<?>, MaterializableIterable> toMaterialize;
- private int lastJobID = 0;
-
- public MSCRPlanner(MRPipeline pipeline, Map<PCollectionImpl<?>, Set<Target>> outputs,
- Map<PCollectionImpl<?>, MaterializableIterable> toMaterialize) {
- this.pipeline = pipeline;
- this.outputs = new TreeMap<PCollectionImpl<?>, Set<Target>>(DEPTH_COMPARATOR);
- this.outputs.putAll(outputs);
- this.toMaterialize = toMaterialize;
- }
-
- // Used to ensure that we always build pipelines starting from the deepest
- // outputs, which helps ensure that we handle intermediate outputs correctly.
- private static final Comparator<PCollectionImpl<?>> DEPTH_COMPARATOR = new Comparator<PCollectionImpl<?>>() {
- @Override
- public int compare(PCollectionImpl<?> left, PCollectionImpl<?> right) {
- int cmp = right.getDepth() - left.getDepth();
- if (cmp == 0) {
- // Ensure we don't throw away two output collections at the same depth.
- // Using the collection name would be nicer here, but names aren't
- // necessarily unique.
- cmp = new Integer(right.hashCode()).compareTo(left.hashCode());
- }
- return cmp;
- }
- };
-
- public MRExecutor plan(Class<?> jarClass, Configuration conf) throws IOException {
- Map<PCollectionImpl<?>, Set<SourceTarget<?>>> targetDeps = Maps.newTreeMap(DEPTH_COMPARATOR);
- for (PCollectionImpl<?> pcollect : outputs.keySet()) {
- targetDeps.put(pcollect, pcollect.getTargetDependencies());
- }
-
- Multimap<Vertex, JobPrototype> assignments = HashMultimap.create();
- Multimap<PCollectionImpl<?>, Vertex> protoDependency = HashMultimap.create();
- while (!targetDeps.isEmpty()) {
- Set<Target> allTargets = Sets.newHashSet();
- for (PCollectionImpl<?> pcollect : targetDeps.keySet()) {
- allTargets.addAll(outputs.get(pcollect));
- }
- GraphBuilder graphBuilder = new GraphBuilder();
-
- // Walk the current plan tree and build a graph in which the vertices are
- // sources, targets, and GBK operations.
- Set<PCollectionImpl<?>> currentStage = Sets.newHashSet();
- Set<PCollectionImpl<?>> laterStage = Sets.newHashSet();
- for (PCollectionImpl<?> output : targetDeps.keySet()) {
- if (Sets.intersection(allTargets, targetDeps.get(output)).isEmpty()) {
- graphBuilder.visitOutput(output);
- currentStage.add(output);
- } else {
- laterStage.add(output);
- }
- }
-
- Graph baseGraph = graphBuilder.getGraph();
-
- // Create a new graph that splits up up dependent GBK nodes.
- Graph graph = prepareFinalGraph(baseGraph);
-
- // Break the graph up into connected components.
- List<List<Vertex>> components = graph.connectedComponents();
-
- // For each component, we will create one or more job prototypes,
- // depending on its profile.
- // For dependency handling, we only need to care about which
- // job prototype a particular GBK is assigned to.
- for (List<Vertex> component : components) {
- assignments.putAll(constructJobPrototypes(component));
- }
-
- // Add in the job dependency information here.
- for (Map.Entry<Vertex, JobPrototype> e : assignments.entries()) {
- JobPrototype current = e.getValue();
- List<Vertex> parents = graph.getParents(e.getKey());
- for (Vertex parent : parents) {
- for (JobPrototype parentJobProto : assignments.get(parent)) {
- current.addDependency(parentJobProto);
- }
- }
- }
-
- // Add cross-stage dependencies.
- for (PCollectionImpl<?> output : currentStage) {
- Set<Target> targets = outputs.get(output);
- Vertex vertex = graph.getVertexAt(output);
- for (PCollectionImpl<?> later : laterStage) {
- if (!Sets.intersection(targets, targetDeps.get(later)).isEmpty()) {
- protoDependency.put(later, vertex);
- }
- }
- targetDeps.remove(output);
- }
- }
-
- // Cross-job dependencies.
- for (Entry<PCollectionImpl<?>, Vertex> pd : protoDependency.entries()) {
- Vertex d = new Vertex(pd.getKey());
- Vertex dj = pd.getValue();
- for (JobPrototype parent : assignments.get(dj)) {
- for (JobPrototype child : assignments.get(d)) {
- child.addDependency(parent);
- }
- }
- }
-
- // Finally, construct the jobs from the prototypes and return.
- DotfileWriter dotfileWriter = new DotfileWriter();
- MRExecutor exec = new MRExecutor(jarClass, outputs, toMaterialize);
- for (JobPrototype proto : Sets.newHashSet(assignments.values())) {
- dotfileWriter.addJobPrototype(proto);
- exec.addJob(proto.getCrunchJob(jarClass, conf, pipeline));
- }
-
- String planDotFile = dotfileWriter.buildDotfile();
- exec.setPlanDotFile(planDotFile);
- conf.set(PlanningParameters.PIPELINE_PLAN_DOTFILE, planDotFile);
-
- return exec;
- }
-
- private Graph prepareFinalGraph(Graph baseGraph) {
- Graph graph = new Graph();
-
- for (Vertex baseVertex : baseGraph) {
- // Add all of the vertices in the base graph, but no edges (yet).
- graph.addVertex(baseVertex.getPCollection(), baseVertex.isOutput());
- }
-
- for (Edge e : baseGraph.getAllEdges()) {
- // Add back all of the edges where neither vertex is a GBK and we do not
- // have an output feeding into a GBK.
- if (!(e.getHead().isGBK() && e.getTail().isGBK()) &&
- !(e.getHead().isOutput() && e.getTail().isGBK())) {
- Vertex head = graph.getVertexAt(e.getHead().getPCollection());
- Vertex tail = graph.getVertexAt(e.getTail().getPCollection());
- graph.getEdge(head, tail).addAllNodePaths(e.getNodePaths());
- }
- }
-
- for (Vertex baseVertex : baseGraph) {
- if (baseVertex.isGBK()) {
- Vertex vertex = graph.getVertexAt(baseVertex.getPCollection());
- for (Edge e : baseVertex.getIncomingEdges()) {
- if (e.getHead().isOutput()) {
- // Execute an edge split.
- Vertex splitTail = e.getHead();
- PCollectionImpl<?> split = splitTail.getPCollection();
- InputCollection<?> inputNode = handleSplitTarget(split);
- Vertex splitHead = graph.addVertex(inputNode, false);
-
- // Divide up the node paths in the edge between the two GBK nodes so
- // that each node is either owned by GBK1 -> newTail or newHead -> GBK2.
- for (NodePath path : e.getNodePaths()) {
- NodePath headPath = path.splitAt(split, splitHead.getPCollection());
- graph.getEdge(vertex, splitTail).addNodePath(headPath);
- graph.getEdge(splitHead, vertex).addNodePath(path);
- }
-
- // Note the dependency between the vertices in the graph.
- graph.markDependency(splitHead, splitTail);
- } else if (!e.getHead().isGBK()) {
- Vertex newHead = graph.getVertexAt(e.getHead().getPCollection());
- graph.getEdge(newHead, vertex).addAllNodePaths(e.getNodePaths());
- }
- }
- for (Edge e : baseVertex.getOutgoingEdges()) {
- if (!e.getTail().isGBK()) {
- Vertex newTail = graph.getVertexAt(e.getTail().getPCollection());
- graph.getEdge(vertex, newTail).addAllNodePaths(e.getNodePaths());
- } else {
- // Execute an Edge split
- Vertex newGraphTail = graph.getVertexAt(e.getTail().getPCollection());
- PCollectionImpl split = e.getSplit();
- InputCollection<?> inputNode = handleSplitTarget(split);
- Vertex splitTail = graph.addVertex(split, true);
- Vertex splitHead = graph.addVertex(inputNode, false);
-
- // Divide up the node paths in the edge between the two GBK nodes so
- // that each node is either owned by GBK1 -> newTail or newHead -> GBK2.
- for (NodePath path : e.getNodePaths()) {
- NodePath headPath = path.splitAt(split, splitHead.getPCollection());
- graph.getEdge(vertex, splitTail).addNodePath(headPath);
- graph.getEdge(splitHead, newGraphTail).addNodePath(path);
- }
-
- // Note the dependency between the vertices in the graph.
- graph.markDependency(splitHead, splitTail);
- }
- }
- }
- }
-
- return graph;
- }
-
- private Multimap<Vertex, JobPrototype> constructJobPrototypes(List<Vertex> component) {
- Multimap<Vertex, JobPrototype> assignment = HashMultimap.create();
- List<Vertex> gbks = Lists.newArrayList();
- for (Vertex v : component) {
- if (v.isGBK()) {
- gbks.add(v);
- }
- }
-
- if (gbks.isEmpty()) {
- HashMultimap<Target, NodePath> outputPaths = HashMultimap.create();
- for (Vertex v : component) {
- if (v.isInput()) {
- for (Edge e : v.getOutgoingEdges()) {
- for (NodePath nodePath : e.getNodePaths()) {
- PCollectionImpl target = nodePath.tail();
- for (Target t : outputs.get(target)) {
- outputPaths.put(t, nodePath);
- }
- }
- }
- }
- }
- if (outputPaths.isEmpty()) {
- throw new IllegalStateException("No outputs?");
- }
- JobPrototype prototype = JobPrototype.createMapOnlyJob(
- ++lastJobID, outputPaths, pipeline.createTempPath());
- for (Vertex v : component) {
- assignment.put(v, prototype);
- }
- } else {
- Set<Edge> usedEdges = Sets.newHashSet();
- for (Vertex g : gbks) {
- Set<NodePath> inputs = Sets.newHashSet();
- for (Edge e : g.getIncomingEdges()) {
- inputs.addAll(e.getNodePaths());
- usedEdges.add(e);
- }
- JobPrototype prototype = JobPrototype.createMapReduceJob(
- ++lastJobID, (PGroupedTableImpl) g.getPCollection(), inputs, pipeline.createTempPath());
- assignment.put(g, prototype);
- for (Edge e : g.getIncomingEdges()) {
- assignment.put(e.getHead(), prototype);
- usedEdges.add(e);
- }
- HashMultimap<Target, NodePath> outputPaths = HashMultimap.create();
- for (Edge e : g.getOutgoingEdges()) {
- Vertex output = e.getTail();
- for (Target t : outputs.get(output.getPCollection())) {
- outputPaths.putAll(t, e.getNodePaths());
- }
- assignment.put(output, prototype);
- usedEdges.add(e);
- }
- prototype.addReducePaths(outputPaths);
- }
-
- // Check for any un-assigned vertices, which should be map-side outputs
- // that we will need to run in a map-only job.
- HashMultimap<Target, NodePath> outputPaths = HashMultimap.create();
- Set<Vertex> orphans = Sets.newHashSet();
- for (Vertex v : component) {
-
- // Check if this vertex has multiple inputs but only a subset of
- // them have already been assigned
- boolean vertexHasUnassignedIncomingEdges = false;
- if (v.isOutput()) {
- for (Edge e : v.getIncomingEdges()) {
- if (!usedEdges.contains(e)) {
- vertexHasUnassignedIncomingEdges = true;
- }
- }
- }
-
- if (v.isOutput() && (vertexHasUnassignedIncomingEdges || !assignment.containsKey(v))) {
- orphans.add(v);
- for (Edge e : v.getIncomingEdges()) {
- if (vertexHasUnassignedIncomingEdges && usedEdges.contains(e)) {
- // We've already dealt with this incoming edge
- continue;
- }
- orphans.add(e.getHead());
- for (NodePath nodePath : e.getNodePaths()) {
- PCollectionImpl target = nodePath.tail();
- for (Target t : outputs.get(target)) {
- outputPaths.put(t, nodePath);
- }
- }
- }
- }
-
- }
- if (!outputPaths.isEmpty()) {
- JobPrototype prototype = JobPrototype.createMapOnlyJob(
- ++lastJobID, outputPaths, pipeline.createTempPath());
- for (Vertex orphan : orphans) {
- assignment.put(orphan, prototype);
- }
- }
- }
-
- return assignment;
- }
-
- private InputCollection<?> handleSplitTarget(PCollectionImpl<?> splitTarget) {
- if (!outputs.containsKey(splitTarget)) {
- outputs.put(splitTarget, Sets.<Target> newHashSet());
- }
-
- SourceTarget srcTarget = null;
- Target targetToReplace = null;
- for (Target t : outputs.get(splitTarget)) {
- if (t instanceof SourceTarget) {
- srcTarget = (SourceTarget<?>) t;
- break;
- } else {
- srcTarget = t.asSourceTarget(splitTarget.getPType());
- if (srcTarget != null) {
- targetToReplace = t;
- break;
- }
- }
- }
- if (targetToReplace != null) {
- outputs.get(splitTarget).remove(targetToReplace);
- } else if (srcTarget == null) {
- srcTarget = pipeline.createIntermediateOutput(splitTarget.getPType());
- }
- outputs.get(splitTarget).add(srcTarget);
- splitTarget.materializeAt(srcTarget);
-
- return (InputCollection<?>) pipeline.read(srcTarget);
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/impl/mr/plan/NodePath.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/NodePath.java b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/NodePath.java
deleted file mode 100644
index a090d93..0000000
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/NodePath.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.impl.mr.plan;
-
-import java.util.Iterator;
-import java.util.LinkedList;
-
-import org.apache.crunch.impl.mr.collect.PCollectionImpl;
-
-import com.google.common.collect.Lists;
-
-class NodePath implements Iterable<PCollectionImpl<?>> {
- private LinkedList<PCollectionImpl<?>> path;
-
- public NodePath() {
- this.path = Lists.newLinkedList();
- }
-
- public NodePath(PCollectionImpl<?> tail) {
- this.path = Lists.newLinkedList();
- this.path.add(tail);
- }
-
- public NodePath(NodePath other) {
- this.path = Lists.newLinkedList(other.path);
- }
-
- public void push(PCollectionImpl<?> stage) {
- this.path.push((PCollectionImpl<?>) stage);
- }
-
- public NodePath close(PCollectionImpl<?> head) {
- this.path.push(head);
- return this;
- }
-
- public Iterator<PCollectionImpl<?>> iterator() {
- return path.iterator();
- }
-
- public Iterator<PCollectionImpl<?>> descendingIterator() {
- return path.descendingIterator();
- }
-
- public PCollectionImpl<?> get(int index) {
- return path.get(index);
- }
-
- public PCollectionImpl<?> head() {
- return path.peekFirst();
- }
-
- public PCollectionImpl<?> tail() {
- return path.peekLast();
- }
-
- @Override
- public boolean equals(Object other) {
- if (other == null || !(other instanceof NodePath)) {
- return false;
- }
- NodePath nodePath = (NodePath) other;
- return path.equals(nodePath.path);
- }
-
- @Override
- public int hashCode() {
- return 17 + 37 * path.hashCode();
- }
-
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder();
- for (PCollectionImpl<?> collect : path) {
- sb.append(collect.getName() + "|");
- }
- sb.deleteCharAt(sb.length() - 1);
- return sb.toString();
- }
-
- public NodePath splitAt(int splitIndex, PCollectionImpl<?> newHead) {
- NodePath top = new NodePath();
- for (int i = 0; i <= splitIndex; i++) {
- top.path.add(path.get(i));
- }
- LinkedList<PCollectionImpl<?>> nextPath = Lists.newLinkedList();
- nextPath.add(newHead);
- nextPath.addAll(path.subList(splitIndex + 1, path.size()));
- path = nextPath;
- return top;
- }
-
- public NodePath splitAt(PCollectionImpl split, PCollectionImpl<?> newHead) {
- NodePath top = new NodePath();
- int splitIndex = 0;
- for (PCollectionImpl p : path) {
- top.path.add(p);
- if (p == split) {
- break;
- }
- splitIndex++;
- }
- LinkedList<PCollectionImpl<?>> nextPath = Lists.newLinkedList();
- nextPath.add(newHead);
- nextPath.addAll(path.subList(splitIndex + 1, path.size()));
- path = nextPath;
- return top;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/impl/mr/plan/PlanningParameters.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/PlanningParameters.java b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/PlanningParameters.java
deleted file mode 100644
index b90a911..0000000
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/PlanningParameters.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.impl.mr.plan;
-
-/**
- * Collection of Configuration keys and various constants used when planning MapReduce jobs for a
- * pipeline.
- */
-public class PlanningParameters {
-
- public static final String MULTI_OUTPUT_PREFIX = "out";
-
- public static final String CRUNCH_WORKING_DIRECTORY = "crunch.work.dir";
-
- /**
- * Configuration key under which a <a href="http://www.graphviz.org">DOT</a> file containing the
- * pipeline job graph is stored by the planner.
- */
- public static final String PIPELINE_PLAN_DOTFILE = "crunch.planner.dotfile";
-
- private PlanningParameters() {
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/impl/mr/plan/Vertex.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/Vertex.java b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/Vertex.java
deleted file mode 100644
index f4aa668..0000000
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/Vertex.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.impl.mr.plan;
-
-import java.util.List;
-import java.util.Set;
-
-import org.apache.commons.lang.builder.ReflectionToStringBuilder;
-import org.apache.commons.lang.builder.ToStringStyle;
-import org.apache.crunch.Source;
-import org.apache.crunch.impl.mr.collect.InputCollection;
-import org.apache.crunch.impl.mr.collect.PCollectionImpl;
-import org.apache.crunch.impl.mr.collect.PGroupedTableImpl;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
-/**
- *
- */
-class Vertex {
- private final PCollectionImpl impl;
-
- private boolean output;
- private Set<Edge> incoming;
- private Set<Edge> outgoing;
-
- public Vertex(PCollectionImpl impl) {
- this.impl = impl;
- this.incoming = Sets.newHashSet();
- this.outgoing = Sets.newHashSet();
- }
-
- public PCollectionImpl getPCollection() {
- return impl;
- }
-
- public boolean isInput() {
- return impl instanceof InputCollection;
- }
-
- public boolean isGBK() {
- return impl instanceof PGroupedTableImpl;
- }
-
- public void setOutput() {
- this.output = true;
- }
-
- public boolean isOutput() {
- return output;
- }
-
- public Source getSource() {
- if (isInput()) {
- return ((InputCollection) impl).getSource();
- }
- return null;
- }
-
- public void addIncoming(Edge edge) {
- this.incoming.add(edge);
- }
-
- public void addOutgoing(Edge edge) {
- this.outgoing.add(edge);
- }
-
- public List<Vertex> getAllNeighbors() {
- List<Vertex> n = Lists.newArrayList();
- for (Edge e : incoming) {
- n.add(e.getHead());
- }
- for (Edge e : outgoing) {
- n.add(e.getTail());
- }
- return n;
- }
-
- public Set<Edge> getAllEdges() {
- return Sets.union(incoming, outgoing);
- }
-
- public Set<Edge> getIncomingEdges() {
- return incoming;
- }
-
- public Set<Edge> getOutgoingEdges() {
- return outgoing;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj == null || !(obj instanceof Vertex)) {
- return false;
- }
- Vertex other = (Vertex) obj;
- return impl.equals(other.impl);
- }
-
- @Override
- public int hashCode() {
- return 17 + 37 * impl.hashCode();
- }
-
- @Override
- public String toString() {
- return new ReflectionToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE).setExcludeFieldNames(
- new String[] { "outgoing", "incoming" }).toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchCombiner.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchCombiner.java b/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchCombiner.java
deleted file mode 100644
index 47a3ded..0000000
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchCombiner.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.impl.mr.run;
-
-public class CrunchCombiner extends CrunchReducer {
-
- @Override
- protected NodeContext getNodeContext() {
- return NodeContext.COMBINE;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputFormat.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputFormat.java b/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputFormat.java
deleted file mode 100644
index eb5dd8a..0000000
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputFormat.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.impl.mr.run;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.crunch.io.CrunchInputs;
-import org.apache.crunch.io.FormatBundle;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.util.ReflectionUtils;
-
-import com.google.common.collect.Lists;
-
-public class CrunchInputFormat<K, V> extends InputFormat<K, V> {
-
- @Override
- public List<InputSplit> getSplits(JobContext job) throws IOException, InterruptedException {
- List<InputSplit> splits = Lists.newArrayList();
- Configuration base = job.getConfiguration();
- Map<FormatBundle, Map<Integer, List<Path>>> formatNodeMap = CrunchInputs.getFormatNodeMap(job);
-
- // First, build a map of InputFormats to Paths
- for (Map.Entry<FormatBundle, Map<Integer, List<Path>>> entry : formatNodeMap.entrySet()) {
- FormatBundle inputBundle = entry.getKey();
- Configuration conf = new Configuration(base);
- inputBundle.configure(conf);
- Job jobCopy = new Job(conf);
- InputFormat<?, ?> format = (InputFormat<?, ?>) ReflectionUtils.newInstance(inputBundle.getFormatClass(),
- jobCopy.getConfiguration());
- for (Map.Entry<Integer, List<Path>> nodeEntry : entry.getValue().entrySet()) {
- Integer nodeIndex = nodeEntry.getKey();
- List<Path> paths = nodeEntry.getValue();
- FileInputFormat.setInputPaths(jobCopy, paths.toArray(new Path[paths.size()]));
-
- // Get splits for each input path and tag with InputFormat
- // and Mapper types by wrapping in a TaggedInputSplit.
- List<InputSplit> pathSplits = format.getSplits(jobCopy);
- for (InputSplit pathSplit : pathSplits) {
- splits.add(new CrunchInputSplit(pathSplit, inputBundle.getFormatClass(),
- nodeIndex, jobCopy.getConfiguration()));
- }
- }
- }
- return splits;
- }
-
- @Override
- public RecordReader<K, V> createRecordReader(InputSplit inputSplit, TaskAttemptContext context) throws IOException,
- InterruptedException {
- return new CrunchRecordReader<K, V>(inputSplit, context);
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputSplit.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputSplit.java b/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputSplit.java
deleted file mode 100644
index b41062b..0000000
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputSplit.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.impl.mr.run;
-
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.serializer.Deserializer;
-import org.apache.hadoop.io.serializer.SerializationFactory;
-import org.apache.hadoop.io.serializer.Serializer;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.util.ReflectionUtils;
-
-class CrunchInputSplit extends InputSplit implements Writable {
-
- private InputSplit inputSplit;
- private Class<? extends InputFormat<?, ?>> inputFormatClass;
- private int nodeIndex;
- private Configuration conf;
-
- public CrunchInputSplit() {
- // default constructor
- }
-
- public CrunchInputSplit(
- InputSplit inputSplit,
- Class<? extends InputFormat<?, ?>> inputFormatClass,
- int nodeIndex,
- Configuration conf) {
- this.inputSplit = inputSplit;
- this.inputFormatClass = inputFormatClass;
- this.nodeIndex = nodeIndex;
- this.conf = conf;
- }
-
- public Configuration getConf() {
- return conf;
- }
-
- public int getNodeIndex() {
- return nodeIndex;
- }
-
- public InputSplit getInputSplit() {
- return inputSplit;
- }
-
- public Class<? extends InputFormat<?, ?>> getInputFormatClass() {
- return inputFormatClass;
- }
-
- @Override
- public long getLength() throws IOException, InterruptedException {
- return inputSplit.getLength();
- }
-
- @Override
- public String[] getLocations() throws IOException, InterruptedException {
- return inputSplit.getLocations();
- }
-
- public void readFields(DataInput in) throws IOException {
- nodeIndex = in.readInt();
- conf = new Configuration();
- conf.readFields(in);
- inputFormatClass = (Class<? extends InputFormat<?, ?>>) readClass(in);
- Class<? extends InputSplit> inputSplitClass = (Class<? extends InputSplit>) readClass(in);
- inputSplit = (InputSplit) ReflectionUtils.newInstance(inputSplitClass, conf);
- SerializationFactory factory = new SerializationFactory(conf);
- Deserializer deserializer = factory.getDeserializer(inputSplitClass);
- deserializer.open((DataInputStream) in);
- inputSplit = (InputSplit) deserializer.deserialize(inputSplit);
- }
-
- private Class<?> readClass(DataInput in) throws IOException {
- String className = Text.readString(in);
- try {
- return conf.getClassByName(className);
- } catch (ClassNotFoundException e) {
- throw new RuntimeException("readObject can't find class", e);
- }
- }
-
- public void write(DataOutput out) throws IOException {
- out.writeInt(nodeIndex);
- conf.write(out);
- Text.writeString(out, inputFormatClass.getName());
- Text.writeString(out, inputSplit.getClass().getName());
- SerializationFactory factory = new SerializationFactory(conf);
- Serializer serializer = factory.getSerializer(inputSplit.getClass());
- serializer.open((DataOutputStream) out);
- serializer.serialize(inputSplit);
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchMapper.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchMapper.java b/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchMapper.java
deleted file mode 100644
index 70f0b01..0000000
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchMapper.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.impl.mr.run;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.crunch.CrunchRuntimeException;
-import org.apache.hadoop.mapreduce.Mapper;
-
-public class CrunchMapper extends Mapper<Object, Object, Object, Object> {
-
- private static final Log LOG = LogFactory.getLog(CrunchMapper.class);
-
- private RTNode node;
- private CrunchTaskContext ctxt;
- private boolean debug;
-
- @Override
- protected void setup(Mapper<Object, Object, Object, Object>.Context context) {
- List<RTNode> nodes;
- this.ctxt = new CrunchTaskContext(context, NodeContext.MAP);
- try {
- nodes = ctxt.getNodes();
- } catch (IOException e) {
- LOG.info("Crunch deserialization error", e);
- throw new CrunchRuntimeException(e);
- }
- if (nodes.size() == 1) {
- this.node = nodes.get(0);
- } else {
- CrunchInputSplit split = (CrunchInputSplit) context.getInputSplit();
- this.node = nodes.get(split.getNodeIndex());
- }
- this.debug = ctxt.isDebugRun();
- }
-
- @Override
- protected void map(Object k, Object v, Mapper<Object, Object, Object, Object>.Context context) {
- if (debug) {
- try {
- node.process(k, v);
- } catch (Exception e) {
- LOG.error("Mapper exception", e);
- }
- } else {
- node.process(k, v);
- }
- }
-
- @Override
- protected void cleanup(Mapper<Object, Object, Object, Object>.Context context) {
- node.cleanup();
- ctxt.cleanup();
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchRecordReader.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchRecordReader.java b/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchRecordReader.java
deleted file mode 100644
index fc8fb32..0000000
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchRecordReader.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.impl.mr.run;
-
-import java.io.IOException;
-
-import org.apache.crunch.hadoop.mapreduce.TaskAttemptContextFactory;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.util.ReflectionUtils;
-
-class CrunchRecordReader<K, V> extends RecordReader<K, V> {
-
- private final RecordReader<K, V> delegate;
-
- public CrunchRecordReader(InputSplit inputSplit, final TaskAttemptContext context) throws IOException,
- InterruptedException {
- CrunchInputSplit crunchSplit = (CrunchInputSplit) inputSplit;
- InputFormat<K, V> inputFormat = (InputFormat<K, V>) ReflectionUtils.newInstance(crunchSplit.getInputFormatClass(),
- crunchSplit.getConf());
- this.delegate = inputFormat.createRecordReader(crunchSplit.getInputSplit(),
- TaskAttemptContextFactory.create(crunchSplit.getConf(), context.getTaskAttemptID()));
- }
-
- @Override
- public void close() throws IOException {
- delegate.close();
- }
-
- @Override
- public K getCurrentKey() throws IOException, InterruptedException {
- return delegate.getCurrentKey();
- }
-
- @Override
- public V getCurrentValue() throws IOException, InterruptedException {
- return delegate.getCurrentValue();
- }
-
- @Override
- public float getProgress() throws IOException, InterruptedException {
- return delegate.getProgress();
- }
-
- @Override
- public void initialize(InputSplit inputSplit, TaskAttemptContext context) throws IOException, InterruptedException {
- CrunchInputSplit crunchSplit = (CrunchInputSplit) inputSplit;
- InputSplit delegateSplit = crunchSplit.getInputSplit();
- delegate.initialize(delegateSplit,
- TaskAttemptContextFactory.create(crunchSplit.getConf(), context.getTaskAttemptID()));
- }
-
- @Override
- public boolean nextKeyValue() throws IOException, InterruptedException {
- return delegate.nextKeyValue();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchReducer.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchReducer.java b/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchReducer.java
deleted file mode 100644
index e5ddbd2..0000000
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchReducer.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.impl.mr.run;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.crunch.CrunchRuntimeException;
-import org.apache.crunch.impl.SingleUseIterable;
-import org.apache.hadoop.mapreduce.Reducer;
-
-public class CrunchReducer extends Reducer<Object, Object, Object, Object> {
-
- private static final Log LOG = LogFactory.getLog(CrunchReducer.class);
-
- private RTNode node;
- private CrunchTaskContext ctxt;
- private boolean debug;
-
- protected NodeContext getNodeContext() {
- return NodeContext.REDUCE;
- }
-
- @Override
- protected void setup(Reducer<Object, Object, Object, Object>.Context context) {
- this.ctxt = new CrunchTaskContext(context, getNodeContext());
- try {
- List<RTNode> nodes = ctxt.getNodes();
- this.node = nodes.get(0);
- } catch (IOException e) {
- LOG.info("Crunch deserialization error", e);
- throw new CrunchRuntimeException(e);
- }
- this.debug = ctxt.isDebugRun();
- }
-
- @Override
- protected void reduce(Object key, Iterable<Object> values, Reducer<Object, Object, Object, Object>.Context context) {
- values = new SingleUseIterable<Object>(values);
- if (debug) {
- try {
- node.processIterable(key, values);
- } catch (Exception e) {
- LOG.error("Reducer exception", e);
- }
- } else {
- node.processIterable(key, values);
- }
- }
-
- @Override
- protected void cleanup(Reducer<Object, Object, Object, Object>.Context context) {
- node.cleanup();
- ctxt.cleanup();
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchTaskContext.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchTaskContext.java b/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchTaskContext.java
deleted file mode 100644
index c4f2873..0000000
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchTaskContext.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.impl.mr.run;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.crunch.CrunchRuntimeException;
-import org.apache.crunch.impl.mr.plan.PlanningParameters;
-import org.apache.crunch.io.CrunchOutputs;
-import org.apache.crunch.util.DistCache;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.TaskInputOutputContext;
-
-class CrunchTaskContext {
-
- private final TaskInputOutputContext<Object, Object, Object, Object> taskContext;
- private final NodeContext nodeContext;
- private CrunchOutputs<Object, Object> multipleOutputs;
-
- public CrunchTaskContext(TaskInputOutputContext<Object, Object, Object, Object> taskContext, NodeContext nodeContext) {
- this.taskContext = taskContext;
- this.nodeContext = nodeContext;
- }
-
- public TaskInputOutputContext<Object, Object, Object, Object> getContext() {
- return taskContext;
- }
-
- public NodeContext getNodeContext() {
- return nodeContext;
- }
-
- public List<RTNode> getNodes() throws IOException {
- Configuration conf = taskContext.getConfiguration();
- Path path = new Path(new Path(conf.get(PlanningParameters.CRUNCH_WORKING_DIRECTORY)), nodeContext.toString());
- @SuppressWarnings("unchecked")
- List<RTNode> nodes = (List<RTNode>) DistCache.read(conf, path);
- if (nodes != null) {
- for (RTNode node : nodes) {
- node.initialize(this);
- }
- }
- return nodes;
- }
-
- public boolean isDebugRun() {
- Configuration conf = taskContext.getConfiguration();
- return conf.getBoolean(RuntimeParameters.DEBUG, false);
- }
-
- public void cleanup() {
- if (multipleOutputs != null) {
- try {
- multipleOutputs.close();
- } catch (IOException e) {
- throw new CrunchRuntimeException(e);
- } catch (InterruptedException e) {
- throw new CrunchRuntimeException(e);
- }
- }
- }
-
- public CrunchOutputs<Object, Object> getMultipleOutputs() {
- if (multipleOutputs == null) {
- multipleOutputs = new CrunchOutputs<Object, Object>(taskContext);
- }
- return multipleOutputs;
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/impl/mr/run/NodeContext.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/run/NodeContext.java b/crunch/src/main/java/org/apache/crunch/impl/mr/run/NodeContext.java
deleted file mode 100644
index ffc9e7c..0000000
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/run/NodeContext.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.impl.mr.run;
-
-import org.apache.crunch.impl.mr.plan.DoNode;
-
-/**
- * Enum that is associated with a serialized {@link DoNode} instance, so we know
- * how to use it within the context of a particular MR job.
- *
- */
-public enum NodeContext {
- MAP,
- REDUCE,
- COMBINE;
-
- public String getConfigurationKey() {
- return "crunch.donode." + toString().toLowerCase();
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java b/crunch/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java
deleted file mode 100644
index ce7b795..0000000
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.impl.mr.run;
-
-import java.io.Serializable;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.crunch.CrunchRuntimeException;
-import org.apache.crunch.DoFn;
-import org.apache.crunch.Emitter;
-import org.apache.crunch.impl.mr.emit.IntermediateEmitter;
-import org.apache.crunch.impl.mr.emit.MultipleOutputEmitter;
-import org.apache.crunch.impl.mr.emit.OutputEmitter;
-import org.apache.crunch.types.Converter;
-import org.apache.crunch.types.PType;
-
-public class RTNode implements Serializable {
-
- private static final Log LOG = LogFactory.getLog(RTNode.class);
-
- private final String nodeName;
- private DoFn<Object, Object> fn;
- private PType<Object> outputPType;
- private final List<RTNode> children;
- private final Converter inputConverter;
- private final Converter outputConverter;
- private final String outputName;
-
- private transient Emitter<Object> emitter;
-
- public RTNode(DoFn<Object, Object> fn, PType<Object> outputPType, String name, List<RTNode> children,
- Converter inputConverter,
- Converter outputConverter, String outputName) {
- this.fn = fn;
- this.outputPType = outputPType;
- this.nodeName = name;
- this.children = children;
- this.inputConverter = inputConverter;
- this.outputConverter = outputConverter;
- this.outputName = outputName;
- }
-
- public void initialize(CrunchTaskContext ctxt) {
- if (emitter != null) {
- // Already initialized
- return;
- }
-
- fn.setContext(ctxt.getContext());
- fn.initialize();
- for (RTNode child : children) {
- child.initialize(ctxt);
- }
-
- if (outputConverter != null) {
- if (outputName != null) {
- this.emitter = new MultipleOutputEmitter(outputConverter, ctxt.getMultipleOutputs(),
- outputName);
- } else {
- this.emitter = new OutputEmitter(outputConverter, ctxt.getContext());
- }
- } else if (!children.isEmpty()) {
- this.emitter = new IntermediateEmitter(outputPType, children,
- ctxt.getContext().getConfiguration());
- } else {
- throw new CrunchRuntimeException("Invalid RTNode config: no emitter for: " + nodeName);
- }
- }
-
- public boolean isLeafNode() {
- return outputConverter != null && children.isEmpty();
- }
-
- public void process(Object input) {
- try {
- fn.process(input, emitter);
- } catch (CrunchRuntimeException e) {
- if (!e.wasLogged()) {
- LOG.info(String.format("Crunch exception in '%s' for input: %s", nodeName, input.toString()), e);
- e.markLogged();
- }
- throw e;
- }
- }
-
- public void process(Object key, Object value) {
- process(inputConverter.convertInput(key, value));
- }
-
- public void processIterable(Object key, Iterable values) {
- process(inputConverter.convertIterableInput(key, values));
- }
-
- public void cleanup() {
- fn.cleanup(emitter);
- emitter.flush();
- for (RTNode child : children) {
- child.cleanup();
- }
- }
-
- @Override
- public String toString() {
- return "RTNode [nodeName=" + nodeName + ", fn=" + fn + ", children=" + children + ", inputConverter="
- + inputConverter + ", outputConverter=" + outputConverter + ", outputName=" + outputName + "]";
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java b/crunch/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java
deleted file mode 100644
index 604c49c..0000000
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.impl.mr.run;
-
-/**
- * Parameters used during the runtime execution.
- */
-public class RuntimeParameters {
-
- public static final String AGGREGATOR_BUCKETS = "crunch.aggregator.buckets";
-
- public static final String DEBUG = "crunch.debug";
-
- public static final String TMP_DIR = "crunch.tmp.dir";
-
- public static final String LOG_JOB_PROGRESS = "crunch.log.job.progress";
-
- public static final String CREATE_DIR = "mapreduce.jobcontrol.createdir.ifnotexist";
-
- // Not instantiated
- private RuntimeParameters() {
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/io/At.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/At.java b/crunch/src/main/java/org/apache/crunch/io/At.java
deleted file mode 100644
index a6f0782..0000000
--- a/crunch/src/main/java/org/apache/crunch/io/At.java
+++ /dev/null
@@ -1,257 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.io;
-
-import org.apache.avro.specific.SpecificRecord;
-import org.apache.crunch.SourceTarget;
-import org.apache.crunch.TableSourceTarget;
-import org.apache.crunch.io.avro.AvroFileSourceTarget;
-import org.apache.crunch.io.seq.SeqFileSourceTarget;
-import org.apache.crunch.io.seq.SeqFileTableSourceTarget;
-import org.apache.crunch.io.text.TextFileSourceTarget;
-import org.apache.crunch.types.PType;
-import org.apache.crunch.types.PTypeFamily;
-import org.apache.crunch.types.avro.AvroType;
-import org.apache.crunch.types.avro.Avros;
-import org.apache.crunch.types.writable.Writables;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Writable;
-
-/**
- * <p>Static factory methods for creating common {@link SourceTarget} types, which may be treated as both a {@code Source}
- * and a {@code Target}.</p>
- *
- * <p>The {@code At} methods is analogous to the {@link From} and {@link To} factory methods, but is used for
- * storing intermediate outputs that need to be passed from one run of a MapReduce pipeline to another run. The
- * {@code SourceTarget} object acts as both a {@code Source} and a {@Target}, which enables it to provide this
- * functionality.
- *
- * <code>
- * Pipeline pipeline = new MRPipeline(this.getClass());
- * // Create our intermediate storage location
- * SourceTarget<String> intermediate = At.textFile("/temptext");
- * ...
- * // Write out the output of the first phase of a pipeline.
- * pipeline.write(phase1, intermediate);
- *
- * // Explicitly call run to kick off the pipeline.
- * pipeline.run();
- *
- * // And then kick off a second phase by consuming the output
- * // from the first phase.
- * PCollection<String> phase2Input = pipeline.read(intermediate);
- * ...
- * </code>
- * </p>
- *
- * <p>The {@code SourceTarget} abstraction is useful when we care about reading the intermediate
- * outputs of a pipeline as well as the final results.</p>
- */
-public class At {
-
- /**
- * Creates a {@code SourceTarget<T>} instance from the Avro file(s) at the given path name.
- *
- * @param pathName The name of the path to the data on the filesystem
- * @param avroClass The subclass of {@code SpecificRecord} to use for the Avro file
- * @return A new {@code SourceTarget<T>} instance
- */
- public static <T extends SpecificRecord> SourceTarget<T> avroFile(String pathName, Class<T> avroClass) {
- return avroFile(new Path(pathName), avroClass);
- }
-
- /**
- * Creates a {@code SourceTarget<T>} instance from the Avro file(s) at the given {@code Path}.
- *
- * @param path The {@code Path} to the data
- * @param avroClass The subclass of {@code SpecificRecord} to use for the Avro file
- * @return A new {@code SourceTarget<T>} instance
- */
- public static <T extends SpecificRecord> SourceTarget<T> avroFile(Path path, Class<T> avroClass) {
- return avroFile(path, Avros.specifics(avroClass));
- }
-
- /**
- * Creates a {@code SourceTarget<T>} instance from the Avro file(s) at the given path name.
- *
- * @param pathName The name of the path to the data on the filesystem
- * @param avroType The {@code AvroType} for the Avro records
- * @return A new {@code SourceTarget<T>} instance
- */
- public static <T> SourceTarget<T> avroFile(String pathName, AvroType<T> avroType) {
- return avroFile(new Path(pathName), avroType);
- }
-
- /**
- * Creates a {@code SourceTarget<T>} instance from the Avro file(s) at the given {@code Path}.
- *
- * @param path The {@code Path} to the data
- * @param avroType The {@code AvroType} for the Avro records
- * @return A new {@code SourceTarget<T>} instance
- */
- public static <T> SourceTarget<T> avroFile(Path path, AvroType<T> avroType) {
- return new AvroFileSourceTarget<T>(path, avroType);
- }
-
- /**
- * Creates a {@code SourceTarget<T>} instance from the SequenceFile(s) at the given path name
- * from the value field of each key-value pair in the SequenceFile(s).
- *
- * @param pathName The name of the path to the data on the filesystem
- * @param valueClass The {@code Writable} type for the value of the SequenceFile entry
- * @return A new {@code SourceTarget<T>} instance
- */
- public static <T extends Writable> SourceTarget<T> sequenceFile(String pathName, Class<T> valueClass) {
- return sequenceFile(new Path(pathName), valueClass);
- }
-
- /**
- * Creates a {@code SourceTarget<T>} instance from the SequenceFile(s) at the given {@code Path}
- * from the value field of each key-value pair in the SequenceFile(s).
- *
- * @param path The {@code Path} to the data
- * @param valueClass The {@code Writable} type for the value of the SequenceFile entry
- * @return A new {@code SourceTarget<T>} instance
- */
- public static <T extends Writable> SourceTarget<T> sequenceFile(Path path, Class<T> valueClass) {
- return sequenceFile(path, Writables.writables(valueClass));
- }
-
- /**
- * Creates a {@code SourceTarget<T>} instance from the SequenceFile(s) at the given path name
- * from the value field of each key-value pair in the SequenceFile(s).
- *
- * @param pathName The name of the path to the data on the filesystem
- * @param ptype The {@code PType} for the value of the SequenceFile entry
- * @return A new {@code SourceTarget<T>} instance
- */
- public static <T> SourceTarget<T> sequenceFile(String pathName, PType<T> ptype) {
- return sequenceFile(new Path(pathName), ptype);
- }
-
- /**
- * Creates a {@code SourceTarget<T>} instance from the SequenceFile(s) at the given {@code Path}
- * from the value field of each key-value pair in the SequenceFile(s).
- *
- * @param path The {@code Path} to the data
- * @param ptype The {@code PType} for the value of the SequenceFile entry
- * @return A new {@code SourceTarget<T>} instance
- */
- public static <T> SourceTarget<T> sequenceFile(Path path, PType<T> ptype) {
- return new SeqFileSourceTarget<T>(path, ptype);
- }
-
- /**
- * Creates a {@code TableSourceTarget<K, V>} instance from the SequenceFile(s) at the given path name
- * from the key-value pairs in the SequenceFile(s).
- *
- * @param pathName The name of the path to the data on the filesystem
- * @param keyClass The {@code Writable} type for the key of the SequenceFile entry
- * @param valueClass The {@code Writable} type for the value of the SequenceFile entry
- * @return A new {@code TableSourceTarget<K, V>} instance
- */
- public static <K extends Writable, V extends Writable> TableSourceTarget<K, V> sequenceFile(
- String pathName, Class<K> keyClass, Class<V> valueClass) {
- return sequenceFile(new Path(pathName), keyClass, valueClass);
- }
-
- /**
- * Creates a {@code TableSourceTarget<K, V>} instance from the SequenceFile(s) at the given {@code Path}
- * from the key-value pairs in the SequenceFile(s).
- *
- * @param path The {@code Path} to the data
- * @param keyClass The {@code Writable} type for the key of the SequenceFile entry
- * @param valueClass The {@code Writable} type for the value of the SequenceFile entry
- * @return A new {@code TableSourceTarget<K, V>} instance
- */
- public static <K extends Writable, V extends Writable> TableSourceTarget<K, V> sequenceFile(
- Path path, Class<K> keyClass, Class<V> valueClass) {
- return sequenceFile(path, Writables.writables(keyClass), Writables.writables(valueClass));
- }
-
- /**
- * Creates a {@code TableSourceTarget<K, V>} instance from the SequenceFile(s) at the given path name
- * from the key-value pairs in the SequenceFile(s).
- *
- * @param pathName The name of the path to the data on the filesystem
- * @param keyType The {@code PType} for the key of the SequenceFile entry
- * @param valueType The {@code PType} for the value of the SequenceFile entry
- * @return A new {@code TableSourceTarget<K, V>} instance
- */
- public static <K, V> TableSourceTarget<K, V> sequenceFile(String pathName, PType<K> keyType, PType<V> valueType) {
- return sequenceFile(new Path(pathName), keyType, valueType);
- }
-
- /**
- * Creates a {@code TableSourceTarget<K, V>} instance from the SequenceFile(s) at the given {@code Path}
- * from the key-value pairs in the SequenceFile(s).
- *
- * @param path The {@code Path} to the data
- * @param keyType The {@code PType} for the key of the SequenceFile entry
- * @param valueType The {@code PType} for the value of the SequenceFile entry
- * @return A new {@code TableSourceTarget<K, V>} instance
- */
- public static <K, V> TableSourceTarget<K, V> sequenceFile(Path path, PType<K> keyType, PType<V> valueType) {
- PTypeFamily ptf = keyType.getFamily();
- return new SeqFileTableSourceTarget<K, V>(path, ptf.tableOf(keyType, valueType));
- }
-
- /**
- * Creates a {@code SourceTarget<String>} instance for the text file(s) at the given path name.
- *
- * @param pathName The name of the path to the data on the filesystem
- * @return A new {@code SourceTarget<String>} instance
- */
- public static SourceTarget<String> textFile(String pathName) {
- return textFile(new Path(pathName));
- }
-
- /**
- * Creates a {@code SourceTarget<String>} instance for the text file(s) at the given {@code Path}.
- *
- * @param path The {@code Path} to the data
- * @return A new {@code SourceTarget<String>} instance
- */
- public static SourceTarget<String> textFile(Path path) {
- return textFile(path, Writables.strings());
- }
-
- /**
- * Creates a {@code SourceTarget<T>} instance for the text file(s) at the given path name using
- * the provided {@code PType<T>} to convert the input text.
- *
- * @param pathName The name of the path to the data on the filesystem
- * @param ptype The {@code PType<T>} to use to process the input text
- * @return A new {@code SourceTarget<T>} instance
- */
- public static <T> SourceTarget<T> textFile(String pathName, PType<T> ptype) {
- return textFile(new Path(pathName), ptype);
- }
-
- /**
- * Creates a {@code SourceTarget<T>} instance for the text file(s) at the given {@code Path} using
- * the provided {@code PType<T>} to convert the input text.
- *
- * @param path The {@code Path} to the data
- * @param ptype The {@code PType<T>} to use to process the input text
- * @return A new {@code SourceTarget<T>} instance
- */
- public static <T> SourceTarget<T> textFile(Path path, PType<T> ptype) {
- return new TextFileSourceTarget<T>(path, ptype);
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/io/CompositePathIterable.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/CompositePathIterable.java b/crunch/src/main/java/org/apache/crunch/io/CompositePathIterable.java
deleted file mode 100644
index a4723e9..0000000
--- a/crunch/src/main/java/org/apache/crunch/io/CompositePathIterable.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.io;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.Collections;
-import java.util.Iterator;
-
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
-
-import com.google.common.collect.UnmodifiableIterator;
-
-public class CompositePathIterable<T> implements Iterable<T> {
-
- private final FileStatus[] stati;
- private final FileSystem fs;
- private final FileReaderFactory<T> readerFactory;
-
- private static final PathFilter FILTER = new PathFilter() {
- @Override
- public boolean accept(Path path) {
- return !path.getName().startsWith("_");
- }
- };
-
- public static <S> Iterable<S> create(FileSystem fs, Path path, FileReaderFactory<S> readerFactory) throws IOException {
-
- if (!fs.exists(path)) {
- throw new IOException("No files found to materialize at: " + path);
- }
-
- FileStatus[] stati = null;
- try {
- stati = fs.listStatus(path, FILTER);
- } catch (FileNotFoundException e) {
- stati = null;
- }
- if (stati == null) {
- throw new IOException("No files found to materialize at: " + path);
- }
-
- if (stati.length == 0) {
- return Collections.emptyList();
- } else {
- return new CompositePathIterable<S>(stati, fs, readerFactory);
- }
-
- }
-
- private CompositePathIterable(FileStatus[] stati, FileSystem fs, FileReaderFactory<T> readerFactory) {
- this.stati = stati;
- this.fs = fs;
- this.readerFactory = readerFactory;
- }
-
- @Override
- public Iterator<T> iterator() {
-
- return new UnmodifiableIterator<T>() {
- private int index = 0;
- private Iterator<T> iter = readerFactory.read(fs, stati[index++].getPath());
-
- @Override
- public boolean hasNext() {
- if (!iter.hasNext()) {
- while (index < stati.length) {
- iter = readerFactory.read(fs, stati[index++].getPath());
- if (iter.hasNext()) {
- return true;
- }
- }
- return false;
- }
- return true;
- }
-
- @Override
- public T next() {
- return iter.next();
- }
- };
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/io/CrunchInputs.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/CrunchInputs.java b/crunch/src/main/java/org/apache/crunch/io/CrunchInputs.java
deleted file mode 100644
index d154db2..0000000
--- a/crunch/src/main/java/org/apache/crunch/io/CrunchInputs.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.io;
-
-import java.util.List;
-import java.util.Map;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-
-import com.google.common.base.Joiner;
-import com.google.common.base.Splitter;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-/**
- * Helper functions for configuring multiple {@code InputFormat} instances within a single
- * Crunch MapReduce job.
- */
-public class CrunchInputs {
- public static final String CRUNCH_INPUTS = "crunch.inputs.dir";
-
- private static final char RECORD_SEP = ',';
- private static final char FIELD_SEP = ';';
- private static final Joiner JOINER = Joiner.on(FIELD_SEP);
- private static final Splitter SPLITTER = Splitter.on(FIELD_SEP);
-
- public static void addInputPath(Job job, Path path, FormatBundle inputBundle, int nodeIndex) {
- Configuration conf = job.getConfiguration();
- String inputs = JOINER.join(inputBundle.serialize(), String.valueOf(nodeIndex), path.toString());
- String existing = conf.get(CRUNCH_INPUTS);
- conf.set(CRUNCH_INPUTS, existing == null ? inputs : existing + RECORD_SEP + inputs);
- }
-
- public static Map<FormatBundle, Map<Integer, List<Path>>> getFormatNodeMap(JobContext job) {
- Map<FormatBundle, Map<Integer, List<Path>>> formatNodeMap = Maps.newHashMap();
- Configuration conf = job.getConfiguration();
- for (String input : Splitter.on(RECORD_SEP).split(conf.get(CRUNCH_INPUTS))) {
- List<String> fields = Lists.newArrayList(SPLITTER.split(input));
- FormatBundle<InputFormat> inputBundle = FormatBundle.fromSerialized(fields.get(0), InputFormat.class);
- if (!formatNodeMap.containsKey(inputBundle)) {
- formatNodeMap.put(inputBundle, Maps.<Integer, List<Path>> newHashMap());
- }
- Integer nodeIndex = Integer.valueOf(fields.get(1));
- if (!formatNodeMap.get(inputBundle).containsKey(nodeIndex)) {
- formatNodeMap.get(inputBundle).put(nodeIndex, Lists.<Path> newLinkedList());
- }
- formatNodeMap.get(inputBundle).get(nodeIndex).add(new Path(fields.get(2)));
- }
- return formatNodeMap;
- }
-
-}