You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by gr...@apache.org on 2012/10/05 14:02:52 UTC
git commit: Add javadoc to MRPipeline constructors
Updated Branches:
refs/heads/master ef10e80bc -> ed7481d9c
Add javadoc to MRPipeline constructors
Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/ed7481d9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/ed7481d9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/ed7481d9
Branch: refs/heads/master
Commit: ed7481d9c88fc9968efa24a18d9b63ce25f0f9d0
Parents: ef10e80
Author: Gabriel Reid <gr...@apache.org>
Authored: Fri Oct 5 14:02:22 2012 +0200
Committer: Gabriel Reid <gr...@apache.org>
Committed: Fri Oct 5 14:02:22 2012 +0200
----------------------------------------------------------------------
.../java/org/apache/crunch/impl/mr/MRPipeline.java | 58 ++++++++++----
1 files changed, 41 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/ed7481d9/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java b/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
index f32783a..043f7b1 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
@@ -58,6 +58,9 @@ import org.apache.log4j.Logger;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
+/**
+ * Pipeline implementation that is executed within Hadoop MapReduce.
+ */
public class MRPipeline implements Pipeline {
private static final Log LOG = LogFactory.getLog(MRPipeline.class);
@@ -74,18 +77,43 @@ public class MRPipeline implements Pipeline {
private Configuration conf;
+ /**
+ * Instantiate with a default Configuration and name.
+ *
+ * @param jarClass Class containing the main driver method for running the pipeline
+ */
public MRPipeline(Class<?> jarClass) {
this(jarClass, new Configuration());
}
+ /**
+ * Instantiate with a custom pipeline name. The name will be displayed in the Hadoop JobTracker.
+ *
+ * @param jarClass Class containing the main driver method for running the pipeline
+ * @param name Display name of the pipeline
+ */
public MRPipeline(Class<?> jarClass, String name) {
this(jarClass, name, new Configuration());
}
+ /**
+ * Instantiate with a custom configuration and default naming.
+ *
+ * @param jarClass Class containing the main driver method for running the pipeline
+ * @param conf Configuration to be used within all MapReduce jobs run in the pipeline
+ */
public MRPipeline(Class<?> jarClass, Configuration conf) {
this(jarClass, jarClass.getName(), conf);
}
+ /**
+ * Instantiate with a custom name and configuration. The name will be displayed in the Hadoop
+ * JobTracker.
+ *
+ * @param jarClass Class containing the main driver method for running the pipeline
+ * @param name Display name of the pipeline
+ * @param conf Configuration to be used within all MapReduce jobs run in the pipeline
+ */
public MRPipeline(Class<?> jarClass, String name, Configuration conf) {
this.jarClass = jarClass;
this.name = name;
@@ -165,8 +193,8 @@ public class MRPipeline implements Pipeline {
if (pcollection instanceof PGroupedTableImpl) {
pcollection = ((PGroupedTableImpl<?, ?>) pcollection).ungroup();
} else if (pcollection instanceof UnionCollection || pcollection instanceof UnionTable) {
- pcollection = pcollection.parallelDo("UnionCollectionWrapper", (MapFn) IdentityFn.<Object> getInstance(),
- pcollection.getPType());
+ pcollection = pcollection.parallelDo("UnionCollectionWrapper",
+ (MapFn) IdentityFn.<Object> getInstance(), pcollection.getPType());
}
addOutput((PCollectionImpl<?>) pcollection, target);
}
@@ -192,17 +220,14 @@ public class MRPipeline implements Pipeline {
}
/**
- * Retrieve a ReadableSourceTarget that provides access to the contents of a
- * {@link PCollection}. This is primarily intended as a helper method to
- * {@link #materialize(PCollection)}. The underlying data of the
- * ReadableSourceTarget may not be actually present until the pipeline is run.
+ * Retrieve a ReadableSourceTarget that provides access to the contents of a {@link PCollection}.
+ * This is primarily intended as a helper method to {@link #materialize(PCollection)}. The
+ * underlying data of the ReadableSourceTarget may not be actually present until the pipeline is
+ * run.
*
- * @param pcollection
- * The collection for which the ReadableSourceTarget is to be
- * retrieved
+ * @param pcollection The collection for which the ReadableSourceTarget is to be retrieved
* @return The ReadableSourceTarget
- * @throws IllegalArgumentException
- * If no ReadableSourceTarget can be retrieved for the given
+ * @throws IllegalArgumentException If no ReadableSourceTarget can be retrieved for the given
* PCollection
*/
public <T> ReadableSourceTarget<T> getMaterializeSourceTarget(PCollection<T> pcollection) {
@@ -237,11 +262,10 @@ public class MRPipeline implements Pipeline {
}
/**
- * Safely cast a PCollection into a PCollectionImpl, including handling the
- * case of UnionCollections.
+ * Safely cast a PCollection into a PCollectionImpl, including handling the case of
+ * UnionCollections.
*
- * @param pcollection
- * The PCollection to be cast/transformed
+ * @param pcollection The PCollection to be cast/transformed
* @return The PCollectionImpl representation
*/
private <T> PCollectionImpl<T> toPcollectionImpl(PCollection<T> pcollection) {
@@ -282,8 +306,8 @@ public class MRPipeline implements Pipeline {
@Override
public <T> void writeTextFile(PCollection<T> pcollection, String pathName) {
// Ensure that this is a writable pcollection instance.
- pcollection = pcollection.parallelDo("asText", IdentityFn.<T> getInstance(),
- WritableTypeFamily.getInstance().as(pcollection.getPType()));
+ pcollection = pcollection.parallelDo("asText", IdentityFn.<T> getInstance(), WritableTypeFamily
+ .getInstance().as(pcollection.getPType()));
write(pcollection, At.textFile(pathName));
}