You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2013/04/23 22:41:14 UTC
[12/43] CRUNCH-196: crunch -> crunch-core rename to fix build issues
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java
deleted file mode 100644
index 6ea9c4c..0000000
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java
+++ /dev/null
@@ -1,295 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.impl.mr.collect;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.crunch.DoFn;
-import org.apache.crunch.FilterFn;
-import org.apache.crunch.MapFn;
-import org.apache.crunch.PCollection;
-import org.apache.crunch.PObject;
-import org.apache.crunch.PTable;
-import org.apache.crunch.Pair;
-import org.apache.crunch.ParallelDoOptions;
-import org.apache.crunch.Pipeline;
-import org.apache.crunch.SourceTarget;
-import org.apache.crunch.Target;
-import org.apache.crunch.fn.ExtractKeyFn;
-import org.apache.crunch.fn.IdentityFn;
-import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.impl.mr.plan.DoNode;
-import org.apache.crunch.lib.Aggregate;
-import org.apache.crunch.materialize.pobject.CollectionPObject;
-import org.apache.crunch.types.PTableType;
-import org.apache.crunch.types.PType;
-import org.apache.crunch.types.PTypeFamily;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
-public abstract class PCollectionImpl<S> implements PCollection<S> {
-
- private static final Log LOG = LogFactory.getLog(PCollectionImpl.class);
-
- private final String name;
- protected MRPipeline pipeline;
- protected SourceTarget<S> materializedAt;
- private final ParallelDoOptions options;
-
- public PCollectionImpl(String name) {
- this(name, ParallelDoOptions.builder().build());
- }
-
- public PCollectionImpl(String name, ParallelDoOptions options) {
- this.name = name;
- this.options = options;
- }
-
- @Override
- public String getName() {
- return name;
- }
-
- @Override
- public String toString() {
- return getName();
- }
-
- @Override
- public PCollection<S> union(PCollection<S> other) {
- return union(new PCollection[] { other });
- }
-
- @Override
- public PCollection<S> union(PCollection<S>... collections) {
- List<PCollectionImpl<S>> internal = Lists.newArrayList();
- internal.add(this);
- for (PCollection<S> collection : collections) {
- internal.add((PCollectionImpl<S>) collection.parallelDo(IdentityFn.<S>getInstance(), collection.getPType()));
- }
- return new UnionCollection<S>(internal);
- }
-
- @Override
- public <T> PCollection<T> parallelDo(DoFn<S, T> fn, PType<T> type) {
- MRPipeline pipeline = (MRPipeline) getPipeline();
- return parallelDo("S" + pipeline.getNextAnonymousStageId(), fn, type);
- }
-
- @Override
- public <T> PCollection<T> parallelDo(String name, DoFn<S, T> fn, PType<T> type) {
- return new DoCollectionImpl<T>(name, getChainingCollection(), fn, type);
- }
-
- @Override
- public <T> PCollection<T> parallelDo(String name, DoFn<S, T> fn, PType<T> type,
- ParallelDoOptions options) {
- return new DoCollectionImpl<T>(name, getChainingCollection(), fn, type, options);
- }
-
- @Override
- public <K, V> PTable<K, V> parallelDo(DoFn<S, Pair<K, V>> fn, PTableType<K, V> type) {
- MRPipeline pipeline = (MRPipeline) getPipeline();
- return parallelDo("S" + pipeline.getNextAnonymousStageId(), fn, type);
- }
-
- @Override
- public <K, V> PTable<K, V> parallelDo(String name, DoFn<S, Pair<K, V>> fn, PTableType<K, V> type) {
- return new DoTableImpl<K, V>(name, getChainingCollection(), fn, type);
- }
-
- @Override
- public <K, V> PTable<K, V> parallelDo(String name, DoFn<S, Pair<K, V>> fn, PTableType<K, V> type,
- ParallelDoOptions options) {
- return new DoTableImpl<K, V>(name, getChainingCollection(), fn, type, options);
- }
-
- public PCollection<S> write(Target target) {
- if (materializedAt != null) {
- getPipeline().write(new InputCollection<S>(materializedAt, (MRPipeline) getPipeline()), target);
- } else {
- getPipeline().write(this, target);
- }
- return this;
- }
-
- @Override
- public PCollection<S> write(Target target, Target.WriteMode writeMode) {
- if (materializedAt != null) {
- getPipeline().write(new InputCollection<S>(materializedAt, (MRPipeline) getPipeline()), target,
- writeMode);
- } else {
- getPipeline().write(this, target, writeMode);
- }
- return this;
- }
-
- @Override
- public Iterable<S> materialize() {
- if (getSize() == 0) {
- LOG.warn("Materializing an empty PCollection: " + this.getName());
- return Collections.emptyList();
- }
- return getPipeline().materialize(this);
- }
-
- /** {@inheritDoc} */
- @Override
- public PObject<Collection<S>> asCollection() {
- return new CollectionPObject<S>(this);
- }
-
- public SourceTarget<S> getMaterializedAt() {
- return materializedAt;
- }
-
- public void materializeAt(SourceTarget<S> sourceTarget) {
- this.materializedAt = sourceTarget;
- }
-
- @Override
- public PCollection<S> filter(FilterFn<S> filterFn) {
- return parallelDo(filterFn, getPType());
- }
-
- @Override
- public PCollection<S> filter(String name, FilterFn<S> filterFn) {
- return parallelDo(name, filterFn, getPType());
- }
-
- @Override
- public <K> PTable<K, S> by(MapFn<S, K> mapFn, PType<K> keyType) {
- return parallelDo(new ExtractKeyFn<K, S>(mapFn), getTypeFamily().tableOf(keyType, getPType()));
- }
-
- @Override
- public <K> PTable<K, S> by(String name, MapFn<S, K> mapFn, PType<K> keyType) {
- return parallelDo(name, new ExtractKeyFn<K, S>(mapFn), getTypeFamily().tableOf(keyType, getPType()));
- }
-
- @Override
- public PTable<S, Long> count() {
- return Aggregate.count(this);
- }
-
- @Override
- public PObject<Long> length() {
- return Aggregate.length(this);
- }
-
- @Override
- public PObject<S> max() {
- return Aggregate.max(this);
- }
-
- @Override
- public PObject<S> min() {
- return Aggregate.min(this);
- }
-
- @Override
- public PTypeFamily getTypeFamily() {
- return getPType().getFamily();
- }
-
- public abstract DoNode createDoNode();
-
- public abstract List<PCollectionImpl<?>> getParents();
-
- public PCollectionImpl<?> getOnlyParent() {
- List<PCollectionImpl<?>> parents = getParents();
- if (parents.size() != 1) {
- throw new IllegalArgumentException("Expected exactly one parent PCollection");
- }
- return parents.get(0);
- }
-
- @Override
- public Pipeline getPipeline() {
- if (pipeline == null) {
- pipeline = (MRPipeline) getParents().get(0).getPipeline();
- }
- return pipeline;
- }
-
- public Set<SourceTarget<?>> getTargetDependencies() {
- Set<SourceTarget<?>> targetDeps = options.getSourceTargets();
- for (PCollectionImpl<?> parent : getParents()) {
- targetDeps = Sets.union(targetDeps, parent.getTargetDependencies());
- }
- return targetDeps;
- }
-
- public int getDepth() {
- int parentMax = 0;
- for (PCollectionImpl parent : getParents()) {
- parentMax = Math.max(parent.getDepth(), parentMax);
- }
- return 1 + parentMax;
- }
-
- public interface Visitor {
- void visitInputCollection(InputCollection<?> collection);
-
- void visitUnionCollection(UnionCollection<?> collection);
-
- void visitDoFnCollection(DoCollectionImpl<?> collection);
-
- void visitDoTable(DoTableImpl<?, ?> collection);
-
- void visitGroupedTable(PGroupedTableImpl<?, ?> collection);
- }
-
- public void accept(Visitor visitor) {
- if (materializedAt != null) {
- visitor.visitInputCollection(new InputCollection<S>(materializedAt, (MRPipeline) getPipeline()));
- } else {
- acceptInternal(visitor);
- }
- }
-
- protected abstract void acceptInternal(Visitor visitor);
-
- @Override
- public long getSize() {
- if (materializedAt != null) {
- long sz = materializedAt.getSize(getPipeline().getConfiguration());
- if (sz > 0) {
- return sz;
- }
- }
- return getSizeInternal();
- }
-
- protected abstract long getSizeInternal();
-
- /**
- * Retrieve the PCollectionImpl to be used for chaining within PCollectionImpls further down the pipeline.
- * @return The PCollectionImpl instance to be chained
- */
- protected PCollectionImpl<S> getChainingCollection(){
- return this;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java
deleted file mode 100644
index ccac5d5..0000000
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.impl.mr.collect;
-
-import java.util.List;
-import java.util.Set;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.crunch.Aggregator;
-import org.apache.crunch.CombineFn;
-import org.apache.crunch.DoFn;
-import org.apache.crunch.Emitter;
-import org.apache.crunch.GroupingOptions;
-import org.apache.crunch.PGroupedTable;
-import org.apache.crunch.PTable;
-import org.apache.crunch.Pair;
-import org.apache.crunch.SourceTarget;
-import org.apache.crunch.fn.Aggregators;
-import org.apache.crunch.impl.mr.plan.DoNode;
-import org.apache.crunch.types.PGroupedTableType;
-import org.apache.crunch.types.PType;
-import org.apache.crunch.util.PartitionUtils;
-import org.apache.hadoop.mapreduce.Job;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Sets;
-
-public class PGroupedTableImpl<K, V> extends PCollectionImpl<Pair<K, Iterable<V>>> implements PGroupedTable<K, V> {
-
- private static final Log LOG = LogFactory.getLog(PGroupedTableImpl.class);
-
- private final PTableBase<K, V> parent;
- private final GroupingOptions groupingOptions;
- private final PGroupedTableType<K, V> ptype;
-
- PGroupedTableImpl(PTableBase<K, V> parent) {
- this(parent, null);
- }
-
- PGroupedTableImpl(PTableBase<K, V> parent, GroupingOptions groupingOptions) {
- super("GBK");
- this.parent = parent;
- this.groupingOptions = groupingOptions;
- this.ptype = parent.getPTableType().getGroupedTableType();
- }
-
- public void configureShuffle(Job job) {
- ptype.configureShuffle(job, groupingOptions);
- if (groupingOptions == null || groupingOptions.getNumReducers() <= 0) {
- int numReduceTasks = PartitionUtils.getRecommendedPartitions(this, getPipeline().getConfiguration());
- if (numReduceTasks > 0) {
- job.setNumReduceTasks(numReduceTasks);
- LOG.info(String.format("Setting num reduce tasks to %d", numReduceTasks));
- } else {
- LOG.warn("Attempted to set a negative number of reduce tasks");
- }
- }
- }
-
- @Override
- protected long getSizeInternal() {
- return parent.getSizeInternal();
- }
-
- @Override
- public PType<Pair<K, Iterable<V>>> getPType() {
- return ptype;
- }
-
- @Override
- public PTable<K, V> combineValues(CombineFn<K, V> combineFn) {
- return new DoTableImpl<K, V>("combine", getChainingCollection(), combineFn, parent.getPTableType());
- }
-
- @Override
- public PTable<K, V> combineValues(Aggregator<V> agg) {
- return combineValues(Aggregators.<K, V>toCombineFn(agg));
- }
-
- private static class Ungroup<K, V> extends DoFn<Pair<K, Iterable<V>>, Pair<K, V>> {
- @Override
- public void process(Pair<K, Iterable<V>> input, Emitter<Pair<K, V>> emitter) {
- for (V v : input.second()) {
- emitter.emit(Pair.of(input.first(), v));
- }
- }
- }
-
- public PTable<K, V> ungroup() {
- return parallelDo("ungroup", new Ungroup<K, V>(), parent.getPTableType());
- }
-
- @Override
- protected void acceptInternal(PCollectionImpl.Visitor visitor) {
- visitor.visitGroupedTable(this);
- }
-
- @Override
- public Set<SourceTarget<?>> getTargetDependencies() {
- Set<SourceTarget<?>> td = Sets.newHashSet(super.getTargetDependencies());
- if (groupingOptions != null) {
- td.addAll(groupingOptions.getSourceTargets());
- }
- return ImmutableSet.copyOf(td);
- }
-
- @Override
- public List<PCollectionImpl<?>> getParents() {
- return ImmutableList.<PCollectionImpl<?>> of(parent);
- }
-
- @Override
- public DoNode createDoNode() {
- return DoNode.createFnNode(getName(), ptype.getInputMapFn(), ptype);
- }
-
- public DoNode getGroupingNode() {
- return DoNode.createGroupingNode("", ptype);
- }
-
- @Override
- protected PCollectionImpl<Pair<K, Iterable<V>>> getChainingCollection() {
- // Use a copy for chaining to allow sending the output of a single grouped table to multiple outputs
- // TODO This should be implemented in a cleaner way in the planner
- return new PGroupedTableImpl<K, V>(parent, groupingOptions);
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PTableBase.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PTableBase.java b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PTableBase.java
deleted file mode 100644
index 3c2393d..0000000
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PTableBase.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.impl.mr.collect;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.crunch.FilterFn;
-import org.apache.crunch.GroupingOptions;
-import org.apache.crunch.PCollection;
-import org.apache.crunch.PObject;
-import org.apache.crunch.PTable;
-import org.apache.crunch.Pair;
-import org.apache.crunch.ParallelDoOptions;
-import org.apache.crunch.TableSource;
-import org.apache.crunch.Target;
-import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.lib.Aggregate;
-import org.apache.crunch.lib.Cogroup;
-import org.apache.crunch.lib.Join;
-import org.apache.crunch.lib.PTables;
-import org.apache.crunch.materialize.MaterializableMap;
-import org.apache.crunch.materialize.pobject.MapPObject;
-import org.apache.crunch.types.PType;
-
-import com.google.common.collect.Lists;
-
-abstract class PTableBase<K, V> extends PCollectionImpl<Pair<K, V>> implements PTable<K, V> {
-
- public PTableBase(String name) {
- super(name);
- }
-
- public PTableBase(String name, ParallelDoOptions options) {
- super(name, options);
- }
-
- public PType<K> getKeyType() {
- return getPTableType().getKeyType();
- }
-
- public PType<V> getValueType() {
- return getPTableType().getValueType();
- }
-
- public PGroupedTableImpl<K, V> groupByKey() {
- return new PGroupedTableImpl<K, V>(this);
- }
-
- public PGroupedTableImpl<K, V> groupByKey(int numReduceTasks) {
- return new PGroupedTableImpl<K, V>(this, GroupingOptions.builder().numReducers(numReduceTasks).build());
- }
-
- public PGroupedTableImpl<K, V> groupByKey(GroupingOptions groupingOptions) {
- return new PGroupedTableImpl<K, V>(this, groupingOptions);
- }
-
- @Override
- public PTable<K, V> union(PTable<K, V> other) {
- return union(new PTable[] { other });
- }
-
- @Override
- public PTable<K, V> union(PTable<K, V>... others) {
- List<PTableBase<K, V>> internal = Lists.newArrayList();
- internal.add(this);
- for (PTable<K, V> table : others) {
- internal.add((PTableBase<K, V>) table);
- }
- return new UnionTable<K, V>(internal);
- }
-
- @Override
- public PTable<K, V> write(Target target) {
- if (getMaterializedAt() != null) {
- getPipeline().write(new InputTable<K, V>(
- (TableSource<K, V>) getMaterializedAt(), (MRPipeline) getPipeline()), target);
- } else {
- getPipeline().write(this, target);
- }
- return this;
- }
-
- @Override
- public PTable<K, V> write(Target target, Target.WriteMode writeMode) {
- if (getMaterializedAt() != null) {
- getPipeline().write(new InputTable<K, V>(
- (TableSource<K, V>) getMaterializedAt(), (MRPipeline) getPipeline()), target, writeMode);
- } else {
- getPipeline().write(this, target, writeMode);
- }
- return this;
- }
-
- @Override
- public PTable<K, V> filter(FilterFn<Pair<K, V>> filterFn) {
- return parallelDo(filterFn, getPTableType());
- }
-
- @Override
- public PTable<K, V> filter(String name, FilterFn<Pair<K, V>> filterFn) {
- return parallelDo(name, filterFn, getPTableType());
- }
-
- @Override
- public PTable<K, V> top(int count) {
- return Aggregate.top(this, count, true);
- }
-
- @Override
- public PTable<K, V> bottom(int count) {
- return Aggregate.top(this, count, false);
- }
-
- @Override
- public PTable<K, Collection<V>> collectValues() {
- return Aggregate.collectValues(this);
- }
-
- @Override
- public <U> PTable<K, Pair<V, U>> join(PTable<K, U> other) {
- return Join.join(this, other);
- }
-
- @Override
- public <U> PTable<K, Pair<Collection<V>, Collection<U>>> cogroup(PTable<K, U> other) {
- return Cogroup.cogroup(this, other);
- }
-
- @Override
- public PCollection<K> keys() {
- return PTables.keys(this);
- }
-
- @Override
- public PCollection<V> values() {
- return PTables.values(this);
- }
-
- /**
- * Returns a Map<K, V> made up of the keys and values in this PTable.
- */
- @Override
- public Map<K, V> materializeToMap() {
- return new MaterializableMap<K, V>(this.materialize());
- }
-
- /** {@inheritDoc} */
- @Override
- public PObject<Map<K, V>> asMap() {
- return new MapPObject<K, V>(this);
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/impl/mr/collect/UnionCollection.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/UnionCollection.java b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/UnionCollection.java
deleted file mode 100644
index 7b3dd7b..0000000
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/UnionCollection.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.impl.mr.collect;
-
-import java.util.List;
-
-import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.impl.mr.plan.DoNode;
-import org.apache.crunch.types.PType;
-
-import com.google.common.collect.ImmutableList;
-
-public class UnionCollection<S> extends PCollectionImpl<S> {
-
- private List<PCollectionImpl<S>> parents;
- private long size = 0;
-
- private static <S> String flatName(List<PCollectionImpl<S>> collections) {
- StringBuilder sb = new StringBuilder("union(");
- for (int i = 0; i < collections.size(); i++) {
- if (i != 0) {
- sb.append(',');
- }
- sb.append(collections.get(i).getName());
- }
- return sb.append(')').toString();
- }
-
- UnionCollection(List<PCollectionImpl<S>> collections) {
- super(flatName(collections));
- this.parents = ImmutableList.copyOf(collections);
- this.pipeline = (MRPipeline) parents.get(0).getPipeline();
- for (PCollectionImpl<S> parent : parents) {
- if (this.pipeline != parent.getPipeline()) {
- throw new IllegalStateException("Cannot union PCollections from different Pipeline instances");
- }
- size += parent.getSize();
- }
- }
-
- @Override
- protected long getSizeInternal() {
- return size;
- }
-
- @Override
- protected void acceptInternal(PCollectionImpl.Visitor visitor) {
- visitor.visitUnionCollection(this);
- }
-
- @Override
- public PType<S> getPType() {
- return parents.get(0).getPType();
- }
-
- @Override
- public List<PCollectionImpl<?>> getParents() {
- return ImmutableList.<PCollectionImpl<?>> copyOf(parents);
- }
-
- @Override
- public DoNode createDoNode() {
- throw new UnsupportedOperationException("Unioned collection does not support DoNodes");
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/impl/mr/collect/UnionTable.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/UnionTable.java b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/UnionTable.java
deleted file mode 100644
index a369432..0000000
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/UnionTable.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.impl.mr.collect;
-
-import java.util.List;
-
-import org.apache.crunch.Pair;
-import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.impl.mr.plan.DoNode;
-import org.apache.crunch.types.PTableType;
-import org.apache.crunch.types.PType;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-
-public class UnionTable<K, V> extends PTableBase<K, V> {
-
- private PTableType<K, V> ptype;
- private List<PCollectionImpl<Pair<K, V>>> parents;
- private long size;
-
- private static <K, V> String flatName(List<PTableBase<K, V>> tables) {
- StringBuilder sb = new StringBuilder("union(");
- for (int i = 0; i < tables.size(); i++) {
- if (i != 0) {
- sb.append(',');
- }
- sb.append(tables.get(i).getName());
- }
- return sb.append(')').toString();
- }
-
- public UnionTable(List<PTableBase<K, V>> tables) {
- super(flatName(tables));
- this.ptype = tables.get(0).getPTableType();
- this.pipeline = (MRPipeline) tables.get(0).getPipeline();
- this.parents = Lists.newArrayList();
- for (PTableBase<K, V> parent : tables) {
- if (pipeline != parent.getPipeline()) {
- throw new IllegalStateException("Cannot union PTables from different Pipeline instances");
- }
- this.parents.add(parent);
- size += parent.getSize();
- }
- }
-
- @Override
- protected long getSizeInternal() {
- return size;
- }
-
- @Override
- public PTableType<K, V> getPTableType() {
- return ptype;
- }
-
- @Override
- public PType<Pair<K, V>> getPType() {
- return ptype;
- }
-
- @Override
- public List<PCollectionImpl<?>> getParents() {
- return ImmutableList.<PCollectionImpl<?>> copyOf(parents);
- }
-
- @Override
- protected void acceptInternal(PCollectionImpl.Visitor visitor) {
- visitor.visitUnionCollection(new UnionCollection<Pair<K, V>>(parents));
- }
-
- @Override
- public DoNode createDoNode() {
- throw new UnsupportedOperationException("Unioned table does not support do nodes");
- }
-
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/impl/mr/emit/IntermediateEmitter.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/emit/IntermediateEmitter.java b/crunch/src/main/java/org/apache/crunch/impl/mr/emit/IntermediateEmitter.java
deleted file mode 100644
index b6df98b..0000000
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/emit/IntermediateEmitter.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.impl.mr.emit;
-
-import java.util.List;
-
-import org.apache.crunch.DoFn;
-import org.apache.crunch.Emitter;
-import org.apache.crunch.impl.mr.run.RTNode;
-import org.apache.crunch.types.PType;
-import org.apache.hadoop.conf.Configuration;
-
-import com.google.common.collect.ImmutableList;
-
-/**
- * An {@link Emitter} implementation that links the output of one {@link DoFn} to the input of
- * another {@code DoFn}.
- *
- */
-public class IntermediateEmitter implements Emitter<Object> {
-
- private final List<RTNode> children;
- private final Configuration conf;
- private final PType<Object> outputPType;
- private final boolean needDetachedValues;
-
- public IntermediateEmitter(PType<Object> outputPType, List<RTNode> children, Configuration conf) {
- this.outputPType = outputPType;
- this.children = ImmutableList.copyOf(children);
- this.conf = conf;
-
- outputPType.initialize(conf);
- needDetachedValues = this.children.size() > 1;
- }
-
- public void emit(Object emitted) {
- for (RTNode child : children) {
- Object value = emitted;
- if (needDetachedValues) {
- value = this.outputPType.getDetachedValue(emitted);
- }
- child.process(value);
- }
- }
-
- public void flush() {
- // No-op
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/impl/mr/emit/MultipleOutputEmitter.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/emit/MultipleOutputEmitter.java b/crunch/src/main/java/org/apache/crunch/impl/mr/emit/MultipleOutputEmitter.java
deleted file mode 100644
index 2e58fed..0000000
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/emit/MultipleOutputEmitter.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.impl.mr.emit;
-
-import java.io.IOException;
-
-import org.apache.crunch.CrunchRuntimeException;
-import org.apache.crunch.Emitter;
-import org.apache.crunch.io.CrunchOutputs;
-import org.apache.crunch.types.Converter;
-
-public class MultipleOutputEmitter<T, K, V> implements Emitter<T> {
-
- private final Converter converter;
- private final CrunchOutputs<K, V> outputs;
- private final String outputName;
-
- public MultipleOutputEmitter(Converter converter, CrunchOutputs<K, V> outputs,
- String outputName) {
- this.converter = converter;
- this.outputs = outputs;
- this.outputName = outputName;
- }
-
- @Override
- public void emit(T emitted) {
- try {
- this.outputs.write(outputName,
- (K) converter.outputKey(emitted),
- (V) converter.outputValue(emitted));
- } catch (Exception e) {
- throw new CrunchRuntimeException(e);
- }
- }
-
- @Override
- public void flush() {
- // No-op
- }
-
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/impl/mr/emit/OutputEmitter.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/emit/OutputEmitter.java b/crunch/src/main/java/org/apache/crunch/impl/mr/emit/OutputEmitter.java
deleted file mode 100644
index bc3ae0d..0000000
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/emit/OutputEmitter.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.impl.mr.emit;
-
-import java.io.IOException;
-
-import org.apache.crunch.CrunchRuntimeException;
-import org.apache.crunch.Emitter;
-import org.apache.crunch.types.Converter;
-import org.apache.hadoop.mapreduce.TaskInputOutputContext;
-
-public class OutputEmitter<T, K, V> implements Emitter<T> {
-
- private final Converter<K, V, Object, Object> converter;
- private final TaskInputOutputContext<?, ?, K, V> context;
-
- public OutputEmitter(Converter<K, V, Object, Object> converter, TaskInputOutputContext<?, ?, K, V> context) {
- this.converter = converter;
- this.context = context;
- }
-
- public void emit(T emitted) {
- try {
- K key = converter.outputKey(emitted);
- V value = converter.outputValue(emitted);
- this.context.write(key, value);
- } catch (IOException e) {
- throw new CrunchRuntimeException(e);
- } catch (InterruptedException e) {
- throw new CrunchRuntimeException(e);
- }
- }
-
- public void flush() {
- // No-op
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/impl/mr/exec/CappedExponentialCounter.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/exec/CappedExponentialCounter.java b/crunch/src/main/java/org/apache/crunch/impl/mr/exec/CappedExponentialCounter.java
deleted file mode 100644
index d90f2e8..0000000
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/exec/CappedExponentialCounter.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.impl.mr.exec;
-
-/**
- * Generate a series of capped numbers exponentially.
- *
- * It is used for creating retry intervals. It is NOT thread-safe.
- */
-public class CappedExponentialCounter {
-
- private long current;
- private final long limit;
-
- public CappedExponentialCounter(long start, long limit) {
- this.current = start;
- this.limit = limit;
- }
-
- public long get() {
- long result = current;
- current = Math.min(current * 2, limit);
- return result;
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJobHooks.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJobHooks.java b/crunch/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJobHooks.java
deleted file mode 100644
index 74bc9ac..0000000
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJobHooks.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.impl.mr.exec;
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchControlledJob;
-import org.apache.crunch.impl.mr.plan.PlanningParameters;
-import org.apache.crunch.impl.mr.run.RuntimeParameters;
-import org.apache.crunch.io.FileNamingScheme;
-import org.apache.crunch.io.PathTarget;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-
-public final class CrunchJobHooks {
-
- private CrunchJobHooks() {}
-
- /** Creates missing input directories before job is submitted. */
- public static final class PrepareHook implements CrunchControlledJob.Hook {
- private final Job job;
-
- public PrepareHook(Job job) {
- this.job = job;
- }
-
- @Override
- public void run() throws IOException {
- Configuration conf = job.getConfiguration();
- if (conf.getBoolean(RuntimeParameters.CREATE_DIR, false)) {
- Path[] inputPaths = FileInputFormat.getInputPaths(job);
- for (Path inputPath : inputPaths) {
- FileSystem fs = inputPath.getFileSystem(conf);
- if (!fs.exists(inputPath)) {
- try {
- fs.mkdirs(inputPath);
- } catch (IOException e) {
- }
- }
- }
- }
- }
- }
-
- /** Moving output files produced by the MapReduce job to specified directories. */
- public static final class CompletionHook implements CrunchControlledJob.Hook {
- private final Job job;
- private final Path workingPath;
- private final Map<Integer, PathTarget> multiPaths;
- private final boolean mapOnlyJob;
-
- public CompletionHook(Job job, Path workingPath, Map<Integer, PathTarget> multiPaths, boolean mapOnlyJob) {
- this.job = job;
- this.workingPath = workingPath;
- this.multiPaths = multiPaths;
- this.mapOnlyJob = mapOnlyJob;
- }
-
- @Override
- public void run() throws IOException {
- handleMultiPaths();
- }
-
- private synchronized void handleMultiPaths() throws IOException {
- if (!multiPaths.isEmpty()) {
- // Need to handle moving the data from the output directory of the
- // job to the output locations specified in the paths.
- FileSystem srcFs = workingPath.getFileSystem(job.getConfiguration());
- for (Map.Entry<Integer, PathTarget> entry : multiPaths.entrySet()) {
- final int i = entry.getKey();
- final Path dst = entry.getValue().getPath();
- FileNamingScheme fileNamingScheme = entry.getValue().getFileNamingScheme();
-
- Path src = new Path(workingPath, PlanningParameters.MULTI_OUTPUT_PREFIX + i + "-*");
- Path[] srcs = FileUtil.stat2Paths(srcFs.globStatus(src), src);
- Configuration conf = job.getConfiguration();
- FileSystem dstFs = dst.getFileSystem(conf);
- if (!dstFs.exists(dst)) {
- dstFs.mkdirs(dst);
- }
- boolean sameFs = isCompatible(srcFs, dst);
- for (Path s : srcs) {
- Path d = getDestFile(conf, s, dst, fileNamingScheme);
- if (sameFs) {
- srcFs.rename(s, d);
- } else {
- FileUtil.copy(srcFs, s, dstFs, d, true, true, job.getConfiguration());
- }
- }
- }
- }
- }
-
- private boolean isCompatible(FileSystem fs, Path path) {
- try {
- fs.makeQualified(path);
- return true;
- } catch (IllegalArgumentException e) {
- return false;
- }
- }
- private Path getDestFile(Configuration conf, Path src, Path dir, FileNamingScheme fileNamingScheme)
- throws IOException {
- String outputFilename = null;
- if (mapOnlyJob) {
- outputFilename = fileNamingScheme.getMapOutputName(conf, dir);
- } else {
- outputFilename = fileNamingScheme.getReduceOutputName(conf, dir, extractPartitionNumber(src.getName()));
- }
- if (src.getName().endsWith(org.apache.avro.mapred.AvroOutputFormat.EXT)) {
- outputFilename += org.apache.avro.mapred.AvroOutputFormat.EXT;
- }
- return new Path(dir, outputFilename);
- }
- }
-
- /**
- * Extract the partition number from a raw reducer output filename.
- *
- * @param reduceOutputFileName The raw reducer output file name
- * @return The partition number encoded in the filename
- */
- static int extractPartitionNumber(String reduceOutputFileName) {
- Matcher matcher = Pattern.compile(".*-r-(\\d{5})").matcher(reduceOutputFileName);
- if (matcher.find()) {
- return Integer.parseInt(matcher.group(1), 10);
- } else {
- throw new IllegalArgumentException("Reducer output name '" + reduceOutputFileName + "' cannot be parsed");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java b/crunch/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
deleted file mode 100644
index 4c7b7ea..0000000
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
+++ /dev/null
@@ -1,198 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.impl.mr.exec;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.crunch.PipelineExecution;
-import org.apache.crunch.PipelineResult;
-import org.apache.crunch.SourceTarget;
-import org.apache.crunch.Target;
-import org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchControlledJob;
-import org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchJobControl;
-import org.apache.crunch.impl.mr.collect.PCollectionImpl;
-import org.apache.crunch.materialize.MaterializableIterable;
-import org.apache.hadoop.conf.Configuration;
-
-import com.google.common.collect.Lists;
-
-/**
- * Provides APIs for job control at runtime to clients.
- *
- * This class has a thread that submits jobs when they become ready, monitors
- * the states of the running jobs, and updates the states of jobs based on the
- * state changes of their depending jobs states.
- *
- * It is thread-safe.
- */
-public class MRExecutor implements PipelineExecution {
-
- private static final Log LOG = LogFactory.getLog(MRExecutor.class);
-
- private final CrunchJobControl control;
- private final Map<PCollectionImpl<?>, Set<Target>> outputTargets;
- private final Map<PCollectionImpl<?>, MaterializableIterable> toMaterialize;
- private final CountDownLatch doneSignal = new CountDownLatch(1);
- private final CountDownLatch killSignal = new CountDownLatch(1);
- private final CappedExponentialCounter pollInterval;
- private AtomicReference<Status> status = new AtomicReference<Status>(Status.READY);
- private PipelineResult result;
- private Thread monitorThread;
-
- private String planDotFile;
-
- public MRExecutor(Class<?> jarClass, Map<PCollectionImpl<?>, Set<Target>> outputTargets,
- Map<PCollectionImpl<?>, MaterializableIterable> toMaterialize) {
- this.control = new CrunchJobControl(jarClass.toString());
- this.outputTargets = outputTargets;
- this.toMaterialize = toMaterialize;
- this.monitorThread = new Thread(new Runnable() {
- @Override
- public void run() {
- monitorLoop();
- }
- });
- this.pollInterval = isLocalMode()
- ? new CappedExponentialCounter(50, 1000)
- : new CappedExponentialCounter(500, 10000);
- }
-
- public void addJob(CrunchControlledJob job) {
- this.control.addJob(job);
- }
-
- public void setPlanDotFile(String planDotFile) {
- this.planDotFile = planDotFile;
- }
-
- public PipelineExecution execute() {
- monitorThread.start();
- return this;
- }
-
- /** Monitors running status. It is called in {@code MonitorThread}. */
- private void monitorLoop() {
- try {
- while (killSignal.getCount() > 0 && !control.allFinished()) {
- control.pollJobStatusAndStartNewOnes();
- killSignal.await(pollInterval.get(), TimeUnit.MILLISECONDS);
- }
- control.killAllRunningJobs();
-
- List<CrunchControlledJob> failures = control.getFailedJobList();
- if (!failures.isEmpty()) {
- System.err.println(failures.size() + " job failure(s) occurred:");
- for (CrunchControlledJob job : failures) {
- System.err.println(job.getJobName() + "(" + job.getJobID() + "): " + job.getMessage());
- }
- }
- List<PipelineResult.StageResult> stages = Lists.newArrayList();
- for (CrunchControlledJob job : control.getSuccessfulJobList()) {
- stages.add(new PipelineResult.StageResult(job.getJobName(), job.getJob().getCounters()));
- }
-
- for (PCollectionImpl<?> c : outputTargets.keySet()) {
- if (toMaterialize.containsKey(c)) {
- MaterializableIterable iter = toMaterialize.get(c);
- if (iter.isSourceTarget()) {
- iter.materialize();
- c.materializeAt((SourceTarget) iter.getSource());
- }
- } else {
- boolean materialized = false;
- for (Target t : outputTargets.get(c)) {
- if (!materialized) {
- if (t instanceof SourceTarget) {
- c.materializeAt((SourceTarget) t);
- materialized = true;
- } else {
- SourceTarget st = t.asSourceTarget(c.getPType());
- if (st != null) {
- c.materializeAt(st);
- materialized = true;
- }
- }
- }
- }
- }
- }
-
- synchronized (this) {
- result = new PipelineResult(stages);
- if (killSignal.getCount() == 0) {
- status.set(Status.KILLED);
- } else {
- status.set(result.succeeded() ? Status.SUCCEEDED : Status.FAILED);
- }
- }
- } catch (InterruptedException e) {
- throw new AssertionError(e); // Nobody should interrupt us.
- } catch (IOException e) {
- LOG.error("Pipeline failed due to exception", e);
- status.set(Status.FAILED);
- } finally {
- doneSignal.countDown();
- }
- }
-
- @Override
- public String getPlanDotFile() {
- return planDotFile;
- }
-
- @Override
- public void waitFor(long timeout, TimeUnit timeUnit) throws InterruptedException {
- doneSignal.await(timeout, timeUnit);
- }
-
- @Override
- public void waitUntilDone() throws InterruptedException {
- doneSignal.await();
- }
-
- @Override
- public synchronized Status getStatus() {
- return status.get();
- }
-
- @Override
- public synchronized PipelineResult getResult() {
- return result;
- }
-
- @Override
- public void kill() throws InterruptedException {
- killSignal.countDown();
- }
-
- private static boolean isLocalMode() {
- Configuration conf = new Configuration();
- // Try to handle MapReduce version 0.20 or 0.22
- String jobTrackerAddress = conf.get("mapreduce.jobtracker.address",
- conf.get("mapred.job.tracker", "local"));
- return "local".equals(jobTrackerAddress);
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/impl/mr/package-info.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/package-info.java b/crunch/src/main/java/org/apache/crunch/impl/mr/package-info.java
deleted file mode 100644
index 7e403c3..0000000
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * A Pipeline implementation that runs on Hadoop MapReduce.
- */
-package org.apache.crunch.impl.mr;
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/impl/mr/plan/DoNode.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/DoNode.java b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/DoNode.java
deleted file mode 100644
index 865369c..0000000
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/DoNode.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.impl.mr.plan;
-
-import java.util.List;
-
-import org.apache.commons.lang.builder.HashCodeBuilder;
-import org.apache.crunch.DoFn;
-import org.apache.crunch.Source;
-import org.apache.crunch.impl.mr.run.NodeContext;
-import org.apache.crunch.impl.mr.run.RTNode;
-import org.apache.crunch.types.Converter;
-import org.apache.crunch.types.PGroupedTableType;
-import org.apache.crunch.types.PType;
-import org.apache.hadoop.conf.Configuration;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-
-public class DoNode {
-
- private static final List<DoNode> NO_CHILDREN = ImmutableList.of();
-
- private final DoFn fn;
- private final String name;
- private final PType<?> ptype;
- private final List<DoNode> children;
- private final Converter outputConverter;
- private final Source<?> source;
- private String outputName;
-
- private DoNode(DoFn fn, String name, PType<?> ptype, List<DoNode> children, Converter outputConverter,
- Source<?> source) {
- this.fn = fn;
- this.name = name;
- this.ptype = ptype;
- this.children = children;
- this.outputConverter = outputConverter;
- this.source = source;
- }
-
- private static List<DoNode> allowsChildren() {
- return Lists.newArrayList();
- }
-
- public static <K, V> DoNode createGroupingNode(String name, PGroupedTableType<K, V> ptype) {
- DoFn<?, ?> fn = ptype.getOutputMapFn();
- return new DoNode(fn, name, ptype, NO_CHILDREN, ptype.getGroupingConverter(), null);
- }
-
- public static <S> DoNode createOutputNode(String name, PType<S> ptype) {
- Converter outputConverter = ptype.getConverter();
- DoFn<?, ?> fn = ptype.getOutputMapFn();
- return new DoNode(fn, name, ptype, NO_CHILDREN, outputConverter, null);
- }
-
- public static DoNode createFnNode(String name, DoFn<?, ?> function, PType<?> ptype) {
- return new DoNode(function, name, ptype, allowsChildren(), null, null);
- }
-
- public static <S> DoNode createInputNode(Source<S> source) {
- PType<?> ptype = source.getType();
- DoFn<?, ?> fn = ptype.getInputMapFn();
- return new DoNode(fn, source.toString(), ptype, allowsChildren(), null, source);
- }
-
- public boolean isInputNode() {
- return source != null;
- }
-
- public boolean isOutputNode() {
- return outputConverter != null;
- }
-
- public String getName() {
- return name;
- }
-
- public List<DoNode> getChildren() {
- return children;
- }
-
- public Source<?> getSource() {
- return source;
- }
-
- public PType<?> getPType() {
- return ptype;
- }
-
- public DoNode addChild(DoNode node) {
- // TODO: This is sort of terrible, refactor the code to make this make more sense.
- boolean exists = false;
- for (DoNode child : children) {
- if (node == child) {
- exists = true;
- break;
- }
- }
- if (!exists) {
- children.add(node);
- }
- return this;
- }
-
- public void setOutputName(String outputName) {
- if (outputConverter == null) {
- throw new IllegalStateException("Cannot set output name w/o output converter: " + outputName);
- }
- this.outputName = outputName;
- }
-
- public RTNode toRTNode(boolean inputNode, Configuration conf, NodeContext nodeContext) {
- List<RTNode> childRTNodes = Lists.newArrayList();
- fn.configure(conf);
- for (DoNode child : children) {
- childRTNodes.add(child.toRTNode(false, conf, nodeContext));
- }
-
- Converter inputConverter = null;
- if (inputNode) {
- if (nodeContext == NodeContext.MAP) {
- inputConverter = ptype.getConverter();
- } else {
- inputConverter = ((PGroupedTableType<?, ?>) ptype).getGroupingConverter();
- }
- }
- return new RTNode(fn, (PType<Object>) getPType(), name, childRTNodes, inputConverter, outputConverter, outputName);
- }
-
- @Override
- public boolean equals(Object other) {
- if (other == null || !(other instanceof DoNode)) {
- return false;
- }
- if (this == other) {
- return true;
- }
- DoNode o = (DoNode) other;
- return (name.equals(o.name) && fn.equals(o.fn) && source == o.source && outputConverter == o.outputConverter);
- }
-
- @Override
- public int hashCode() {
- HashCodeBuilder hcb = new HashCodeBuilder();
- return hcb.append(name).append(fn).append(source).append(outputConverter).toHashCode();
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/impl/mr/plan/DotfileWriter.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/DotfileWriter.java b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/DotfileWriter.java
deleted file mode 100644
index 46d8c53..0000000
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/DotfileWriter.java
+++ /dev/null
@@ -1,238 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.impl.mr.plan;
-
-import java.util.List;
-import java.util.Set;
-
-import org.apache.crunch.Pair;
-import org.apache.crunch.Target;
-import org.apache.crunch.impl.mr.collect.InputCollection;
-import org.apache.crunch.impl.mr.collect.PCollectionImpl;
-import org.apache.crunch.impl.mr.collect.PGroupedTableImpl;
-
-import com.google.common.base.Joiner;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
-/**
- * Writes <a href="http://www.graphviz.org">Graphviz</a> dot files to illustrate
- * the topology of Crunch pipelines.
- */
-public class DotfileWriter {
-
- /** The types of tasks within a MapReduce job. */
- enum MRTaskType { MAP, REDUCE };
-
- private Set<JobPrototype> jobPrototypes = Sets.newHashSet();
- private HashMultimap<Pair<JobPrototype, MRTaskType>, String> jobNodeDeclarations = HashMultimap.create();
- private Set<String> globalNodeDeclarations = Sets.newHashSet();
- private Set<String> nodePathChains = Sets.newHashSet();
-
- /**
- * Format the declaration of a node based on a PCollection.
- *
- * @param pcollectionImpl PCollection for which a node will be declared
- * @param jobPrototype The job containing the PCollection
- * @return The node declaration
- */
- String formatPCollectionNodeDeclaration(PCollectionImpl<?> pcollectionImpl, JobPrototype jobPrototype) {
- String shape = "box";
- if (pcollectionImpl instanceof InputCollection) {
- shape = "folder";
- }
- return String.format("%s [label=\"%s\" shape=%s];", formatPCollection(pcollectionImpl, jobPrototype), pcollectionImpl.getName(),
- shape);
- }
-
- /**
- * Format a Target as a node declaration.
- *
- * @param target A Target used within a MapReduce pipeline
- * @return The global node declaration for the Target
- */
- String formatTargetNodeDeclaration(Target target) {
- return String.format("\"%s\" [label=\"%s\" shape=folder];", target.toString(), target.toString());
- }
-
- /**
- * Format a PCollectionImpl into a format to be used for dot files.
- *
- * @param pcollectionImpl The PCollectionImpl to be formatted
- * @param jobPrototype The job containing the PCollection
- * @return The dot file formatted representation of the PCollectionImpl
- */
- String formatPCollection(PCollectionImpl<?> pcollectionImpl, JobPrototype jobPrototype) {
- if (pcollectionImpl instanceof InputCollection) {
- InputCollection<?> inputCollection = (InputCollection<?>) pcollectionImpl;
- return String.format("\"%s\"", inputCollection.getSource());
- }
- return String.format("\"%s@%d@%d\"", pcollectionImpl.getName(), pcollectionImpl.hashCode(), jobPrototype.hashCode());
- }
-
- /**
- * Format a collection of node strings into dot file syntax.
- *
- * @param nodeCollection Collection of chained node strings
- * @return The dot-formatted chain of nodes
- */
- String formatNodeCollection(List<String> nodeCollection) {
- return String.format("%s;", Joiner.on(" -> ").join(nodeCollection));
- }
-
- /**
- * Format a NodePath in dot file syntax.
- *
- * @param nodePath The node path to be formatted
- * @param jobPrototype The job containing the NodePath
- * @return The dot file representation of the node path
- */
- List<String> formatNodePath(NodePath nodePath, JobPrototype jobPrototype) {
- List<String> formattedNodePaths = Lists.newArrayList();
-
- List<PCollectionImpl<?>> pcollections = Lists.newArrayList(nodePath);
- for (int collectionIndex = 1; collectionIndex < pcollections.size(); collectionIndex++){
- String fromNode = formatPCollection(pcollections.get(collectionIndex - 1), jobPrototype);
- String toNode = formatPCollection(pcollections.get(collectionIndex), jobPrototype);
- formattedNodePaths.add(formatNodeCollection(Lists.newArrayList(fromNode, toNode)));
- }
- return formattedNodePaths;
- }
-
- /**
- * Add a NodePath to be formatted as a list of node declarations within a
- * single job.
- *
- * @param jobPrototype The job containing the node path
- * @param nodePath The node path to be formatted
- */
- void addNodePathDeclarations(JobPrototype jobPrototype, NodePath nodePath) {
- boolean groupingEncountered = false;
- for (PCollectionImpl<?> pcollectionImpl : nodePath) {
- if (pcollectionImpl instanceof InputCollection) {
- globalNodeDeclarations.add(formatPCollectionNodeDeclaration(pcollectionImpl, jobPrototype));
- } else {
- if (!groupingEncountered){
- groupingEncountered = (pcollectionImpl instanceof PGroupedTableImpl);
- }
-
- MRTaskType taskType = groupingEncountered ? MRTaskType.REDUCE : MRTaskType.MAP;
- jobNodeDeclarations.put(Pair.of(jobPrototype, taskType), formatPCollectionNodeDeclaration(pcollectionImpl, jobPrototype));
- }
- }
- }
-
- /**
- * Add the chaining of a NodePath to the graph.
- *
- * @param nodePath The path to be formatted as a node chain in the dot file
- * @param jobPrototype The job containing the NodePath
- */
- void addNodePathChain(NodePath nodePath, JobPrototype jobPrototype) {
- for (String nodePathChain : formatNodePath(nodePath, jobPrototype)){
- this.nodePathChains.add(nodePathChain);
- }
- }
-
- /**
- * Get the graph attributes for a task-specific subgraph.
- *
- * @param taskType The type of task in the subgraph
- * @return Graph attributes
- */
- String getTaskGraphAttributes(MRTaskType taskType) {
- if (taskType == MRTaskType.MAP) {
- return "label = Map; color = blue;";
- } else {
- return "label = Reduce; color = red;";
- }
- }
-
- /**
- * Add the contents of a {@link JobPrototype} to the graph describing a
- * pipeline.
- *
- * @param jobPrototype A JobPrototype representing a portion of a MapReduce
- * pipeline
- */
- public void addJobPrototype(JobPrototype jobPrototype) {
- jobPrototypes.add(jobPrototype);
- if (!jobPrototype.isMapOnly()) {
- for (NodePath nodePath : jobPrototype.getMapNodePaths()) {
- addNodePathDeclarations(jobPrototype, nodePath);
- addNodePathChain(nodePath, jobPrototype);
- }
- }
-
- HashMultimap<Target, NodePath> targetsToNodePaths = jobPrototype.getTargetsToNodePaths();
- for (Target target : targetsToNodePaths.keySet()) {
- globalNodeDeclarations.add(formatTargetNodeDeclaration(target));
- for (NodePath nodePath : targetsToNodePaths.get(target)) {
- addNodePathDeclarations(jobPrototype, nodePath);
- addNodePathChain(nodePath, jobPrototype);
- nodePathChains.add(formatNodeCollection(Lists.newArrayList(formatPCollection(nodePath.descendingIterator()
- .next(), jobPrototype), String.format("\"%s\"", target.toString()))));
- }
- }
- }
-
- /**
- * Build up the full dot file containing the description of a MapReduce
- * pipeline.
- *
- * @return Graphviz dot file contents
- */
- public String buildDotfile() {
- StringBuilder stringBuilder = new StringBuilder();
- stringBuilder.append("digraph G {\n");
- int clusterIndex = 0;
-
- for (String globalDeclaration : globalNodeDeclarations) {
- stringBuilder.append(String.format(" %s\n", globalDeclaration));
- }
-
- for (JobPrototype jobPrototype : jobPrototypes){
- StringBuilder jobProtoStringBuilder = new StringBuilder();
- jobProtoStringBuilder.append(String.format(" subgraph cluster%d {\n", clusterIndex++));
- for (MRTaskType taskType : MRTaskType.values()){
- Pair<JobPrototype,MRTaskType> jobTaskKey = Pair.of(jobPrototype, taskType);
- if (jobNodeDeclarations.containsKey(jobTaskKey)){
- jobProtoStringBuilder.append(String.format(" subgraph cluster%d {\n", clusterIndex++));
- jobProtoStringBuilder.append(String.format(" %s\n", getTaskGraphAttributes(taskType)));
- for (String declarationEntry : jobNodeDeclarations.get(jobTaskKey)){
- jobProtoStringBuilder.append(String.format(" %s\n", declarationEntry));
- }
- jobProtoStringBuilder.append(" }\n");
- }
- }
- jobProtoStringBuilder.append(" }\n");
- stringBuilder.append(jobProtoStringBuilder.toString());
- }
-
- for (String nodePathChain : nodePathChains) {
- stringBuilder.append(String.format(" %s\n", nodePathChain));
- }
-
- stringBuilder.append("}\n");
- return stringBuilder.toString();
- }
-
-
-
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/impl/mr/plan/Edge.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/Edge.java b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/Edge.java
deleted file mode 100644
index 1e59df0..0000000
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/Edge.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.impl.mr.plan;
-
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.commons.lang.builder.HashCodeBuilder;
-import org.apache.commons.lang.builder.ReflectionToStringBuilder;
-import org.apache.commons.lang.builder.ToStringStyle;
-import org.apache.crunch.impl.mr.collect.PCollectionImpl;
-import org.apache.crunch.impl.mr.collect.PGroupedTableImpl;
-
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
-/**
- *
- */
-class Edge {
- private final Vertex head;
- private final Vertex tail;
- private final Set<NodePath> paths;
-
- public Edge(Vertex head, Vertex tail) {
- this.head = head;
- this.tail = tail;
- this.paths = Sets.newHashSet();
- }
-
- public Vertex getHead() {
- return head;
- }
-
- public Vertex getTail() {
- return tail;
- }
-
- public void addNodePath(NodePath path) {
- this.paths.add(path);
- }
-
- public void addAllNodePaths(Collection<NodePath> paths) {
- this.paths.addAll(paths);
- }
-
- public Set<NodePath> getNodePaths() {
- return paths;
- }
-
- public PCollectionImpl getSplit() {
- List<Iterator<PCollectionImpl<?>>> iters = Lists.newArrayList();
- for (NodePath nodePath : paths) {
- Iterator<PCollectionImpl<?>> iter = nodePath.iterator();
- iter.next(); // prime this past the initial NGroupedTableImpl
- iters.add(iter);
- }
-
- // Find the lowest point w/the lowest cost to be the split point for
- // all of the dependent paths.
- boolean end = false;
- int splitIndex = -1;
- while (!end) {
- splitIndex++;
- PCollectionImpl<?> current = null;
- for (Iterator<PCollectionImpl<?>> iter : iters) {
- if (iter.hasNext()) {
- PCollectionImpl<?> next = iter.next();
- if (next instanceof PGroupedTableImpl) {
- end = true;
- break;
- } else if (current == null) {
- current = next;
- } else if (current != next) {
- end = true;
- break;
- }
- } else {
- end = true;
- break;
- }
- }
- }
- // TODO: Add costing calcs here.
-
- return Iterables.getFirst(paths, null).get(splitIndex);
- }
-
- @Override
- public boolean equals(Object other) {
- if (other == null || !(other instanceof Edge)) {
- return false;
- }
- Edge e = (Edge) other;
- return head.equals(e.head) && tail.equals(e.tail) && paths.equals(e.paths);
- }
-
- @Override
- public int hashCode() {
- return new HashCodeBuilder().append(head).append(tail).toHashCode();
- }
-
- @Override
- public String toString() {
- return ReflectionToStringBuilder.toString(this, ToStringStyle.SHORT_PREFIX_STYLE);
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/impl/mr/plan/Graph.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/Graph.java b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/Graph.java
deleted file mode 100644
index ce0a847..0000000
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/Graph.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.impl.mr.plan;
-
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.crunch.Pair;
-import org.apache.crunch.impl.mr.collect.PCollectionImpl;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
-/**
- *
- */
-class Graph implements Iterable<Vertex> {
-
- private final Map<PCollectionImpl, Vertex> vertices;
- private final Map<Pair<Vertex, Vertex>, Edge> edges;
- private final Map<Vertex, List<Vertex>> dependencies;
-
- public Graph() {
- this.vertices = Maps.newHashMap();
- this.edges = Maps.newHashMap();
- this.dependencies = Maps.newHashMap();
- }
-
- public Vertex getVertexAt(PCollectionImpl impl) {
- return vertices.get(impl);
- }
-
- public Vertex addVertex(PCollectionImpl impl, boolean output) {
- if (vertices.containsKey(impl)) {
- Vertex v = vertices.get(impl);
- if (output) {
- v.setOutput();
- }
- return v;
- }
- Vertex v = new Vertex(impl);
- vertices.put(impl, v);
- if (output) {
- v.setOutput();
- }
- return v;
- }
-
- public Edge getEdge(Vertex head, Vertex tail) {
- Pair<Vertex, Vertex> p = Pair.of(head, tail);
- if (edges.containsKey(p)) {
- return edges.get(p);
- }
-
- Edge e = new Edge(head, tail);
- edges.put(p, e);
- tail.addIncoming(e);
- head.addOutgoing(e);
- return e;
- }
-
- @Override
- public Iterator<Vertex> iterator() {
- return Sets.newHashSet(vertices.values()).iterator();
- }
-
- public Set<Edge> getAllEdges() {
- return Sets.newHashSet(edges.values());
- }
-
- public void markDependency(Vertex child, Vertex parent) {
- List<Vertex> parents = dependencies.get(child);
- if (parents == null) {
- parents = Lists.newArrayList();
- dependencies.put(child, parents);
- }
- parents.add(parent);
- }
-
- public List<Vertex> getParents(Vertex child) {
- if (dependencies.containsKey(child)) {
- return dependencies.get(child);
- }
- return ImmutableList.of();
- }
-
- public List<List<Vertex>> connectedComponents() {
- List<List<Vertex>> components = Lists.newArrayList();
- Set<Vertex> unassigned = Sets.newHashSet(vertices.values());
- while (!unassigned.isEmpty()) {
- Vertex base = unassigned.iterator().next();
- List<Vertex> component = Lists.newArrayList();
- component.add(base);
- unassigned.remove(base);
- Set<Vertex> working = Sets.newHashSet(base.getAllNeighbors());
- while (!working.isEmpty()) {
- Vertex n = working.iterator().next();
- working.remove(n);
- if (unassigned.contains(n)) {
- component.add(n);
- unassigned.remove(n);
- for (Vertex n2 : n.getAllNeighbors()) {
- if (unassigned.contains(n2)) {
- working.add(n2);
- }
- }
- }
- }
- components.add(component);
- }
-
- return components;
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/impl/mr/plan/GraphBuilder.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/GraphBuilder.java b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/GraphBuilder.java
deleted file mode 100644
index 925c39a..0000000
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/GraphBuilder.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.impl.mr.plan;
-
-import org.apache.crunch.impl.mr.collect.DoCollectionImpl;
-import org.apache.crunch.impl.mr.collect.DoTableImpl;
-import org.apache.crunch.impl.mr.collect.InputCollection;
-import org.apache.crunch.impl.mr.collect.PCollectionImpl;
-import org.apache.crunch.impl.mr.collect.PGroupedTableImpl;
-import org.apache.crunch.impl.mr.collect.UnionCollection;
-
-/**
- *
- */
-class GraphBuilder implements PCollectionImpl.Visitor {
-
- private Graph graph = new Graph();
- private Vertex workingVertex;
- private NodePath workingPath;
-
- public Graph getGraph() {
- return graph;
- }
-
- public void visitOutput(PCollectionImpl<?> output) {
- workingVertex = graph.addVertex(output, true);
- workingPath = new NodePath();
- output.accept(this);
- }
-
- @Override
- public void visitInputCollection(InputCollection<?> collection) {
- Vertex v = graph.addVertex(collection, false);
- graph.getEdge(v, workingVertex).addNodePath(workingPath.close(collection));
- }
-
- @Override
- public void visitUnionCollection(UnionCollection<?> collection) {
- Vertex baseVertex = workingVertex;
- NodePath basePath = workingPath;
- for (PCollectionImpl<?> parent : collection.getParents()) {
- workingPath = new NodePath(basePath);
- workingVertex = baseVertex;
- processParent(parent);
- }
- }
-
- @Override
- public void visitDoFnCollection(DoCollectionImpl<?> collection) {
- workingPath.push(collection);
- processParent(collection.getOnlyParent());
- }
-
- @Override
- public void visitDoTable(DoTableImpl<?, ?> collection) {
- workingPath.push(collection);
- processParent(collection.getOnlyParent());
- }
-
- @Override
- public void visitGroupedTable(PGroupedTableImpl<?, ?> collection) {
- Vertex v = graph.addVertex(collection, false);
- graph.getEdge(v, workingVertex).addNodePath(workingPath.close(collection));
- workingVertex = v;
- workingPath = new NodePath(collection);
- processParent(collection.getOnlyParent());
- }
-
- private void processParent(PCollectionImpl<?> parent) {
- Vertex v = graph.getVertexAt(parent);
- if (v == null) {
- parent.accept(this);
- } else {
- graph.getEdge(v, workingVertex).addNodePath(workingPath.close(parent));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/impl/mr/plan/JobNameBuilder.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/JobNameBuilder.java b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/JobNameBuilder.java
deleted file mode 100644
index 9ad7300..0000000
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/JobNameBuilder.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.impl.mr.plan;
-
-import java.util.List;
-
-import com.google.common.base.Joiner;
-import com.google.common.collect.Lists;
-
-/**
- * Visitor that traverses the {@code DoNode} instances in a job and builds a
- * String that identifies the stages of the pipeline that belong to this job.
- */
-class JobNameBuilder {
-
- private static final Joiner JOINER = Joiner.on("+");
- private static final Joiner CHILD_JOINER = Joiner.on("/");
-
- private String pipelineName;
- List<String> rootStack = Lists.newArrayList();
-
- public JobNameBuilder(final String pipelineName) {
- this.pipelineName = pipelineName;
- }
-
- public void visit(DoNode node) {
- visit(node, rootStack);
- }
-
- public void visit(List<DoNode> nodes) {
- visit(nodes, rootStack);
- }
-
- private void visit(List<DoNode> nodes, List<String> stack) {
- if (nodes.size() == 1) {
- visit(nodes.get(0), stack);
- } else {
- List<String> childStack = Lists.newArrayList();
- for (int i = 0; i < nodes.size(); i++) {
- DoNode node = nodes.get(i);
- List<String> subStack = Lists.newArrayList();
- visit(node, subStack);
- if (!subStack.isEmpty()) {
- childStack.add("[" + JOINER.join(subStack) + "]");
- }
- }
- if (!childStack.isEmpty()) {
- stack.add("[" + CHILD_JOINER.join(childStack) + "]");
- }
- }
- }
-
- private void visit(DoNode node, List<String> stack) {
- String name = node.getName();
- if (!name.isEmpty()) {
- stack.add(node.getName());
- }
- visit(node.getChildren(), stack);
- }
-
- public String build() {
- return String.format("%s: %s", pipelineName, JOINER.join(rootStack));
- }
-}