You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2012/12/07 20:51:05 UTC
[1/3] git commit: CRUNCH-124: Additional javadoc details and fix a
few generics warnings
Updated Branches:
refs/heads/master 374bf3de6 -> 06ef56e60
CRUNCH-124: Additional javadoc details and fix a few generics warnings
Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/06ef56e6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/06ef56e6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/06ef56e6
Branch: refs/heads/master
Commit: 06ef56e6017d296242ec05ca9b8ed36b9661ab84
Parents: 8ce493a
Author: Josh Wills <jw...@apache.org>
Authored: Fri Dec 7 11:45:35 2012 -0800
Committer: Josh Wills <jw...@apache.org>
Committed: Fri Dec 7 11:45:35 2012 -0800
----------------------------------------------------------------------
crunch/src/main/java/org/apache/crunch/io/At.java | 30 ++++++++++-
.../src/main/java/org/apache/crunch/io/From.java | 41 ++++++++++++---
crunch/src/main/java/org/apache/crunch/io/To.java | 40 ++++++++++++--
3 files changed, 97 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/06ef56e6/crunch/src/main/java/org/apache/crunch/io/At.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/At.java b/crunch/src/main/java/org/apache/crunch/io/At.java
index d91647b..a6f0782 100644
--- a/crunch/src/main/java/org/apache/crunch/io/At.java
+++ b/crunch/src/main/java/org/apache/crunch/io/At.java
@@ -33,8 +33,34 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Writable;
/**
- * Static factory methods for creating common {@link SourceTarget} types, which may be treated as both a {@code Source}
- * and a {@code Target}.
+ * <p>Static factory methods for creating common {@link SourceTarget} types, which may be treated as both a {@code Source}
+ * and a {@code Target}.</p>
+ *
+ * <p>The {@code At} methods is analogous to the {@link From} and {@link To} factory methods, but is used for
+ * storing intermediate outputs that need to be passed from one run of a MapReduce pipeline to another run. The
+ * {@code SourceTarget} object acts as both a {@code Source} and a {@Target}, which enables it to provide this
+ * functionality.
+ *
+ * <code>
+ * Pipeline pipeline = new MRPipeline(this.getClass());
+ * // Create our intermediate storage location
+ * SourceTarget<String> intermediate = At.textFile("/temptext");
+ * ...
+ * // Write out the output of the first phase of a pipeline.
+ * pipeline.write(phase1, intermediate);
+ *
+ * // Explicitly call run to kick off the pipeline.
+ * pipeline.run();
+ *
+ * // And then kick off a second phase by consuming the output
+ * // from the first phase.
+ * PCollection<String> phase2Input = pipeline.read(intermediate);
+ * ...
+ * </code>
+ * </p>
+ *
+ * <p>The {@code SourceTarget} abstraction is useful when we care about reading the intermediate
+ * outputs of a pipeline as well as the final results.</p>
*/
public class At {
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/06ef56e6/crunch/src/main/java/org/apache/crunch/io/From.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/From.java b/crunch/src/main/java/org/apache/crunch/io/From.java
index 371f934..e4cfb6a 100644
--- a/crunch/src/main/java/org/apache/crunch/io/From.java
+++ b/crunch/src/main/java/org/apache/crunch/io/From.java
@@ -36,13 +36,38 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
/**
- * Static factory methods for creating common {@link Source} types.
+ * <p>Static factory methods for creating common {@link Source} types.</p>
+ *
+ * <p>The {@code From} class is intended to provide a literate API for creating
+ * Crunch pipelines from common input file types.
+ *
+ * <code>
+ * Pipeline pipeline = new MRPipeline(this.getClass());
+ *
+ * // Reference the lines of a text file by wrapping the TextInputFormat class.
+ * PCollection<String> lines = pipeline.read(From.textFile("/path/to/myfiles"));
+ *
+ * // Reference entries from a sequence file where the key is a LongWritable and the
+ * // value is a custom Writable class.
+ * PTable<LongWritable, MyWritable> table = pipeline.read(From.sequenceFile(
+ * "/path/to/seqfiles", LongWritable.class, MyWritable.class));
+ *
+ * // Reference the records from an Avro file, where MyAvroObject implements Avro's
+ * // SpecificRecord interface.
+ * PCollection<MyAvroObject> myObjects = pipeline.read(From.avroFile("/path/to/avrofiles",
+ * MyAvroObject.class));
+ *
+ * // References the key-value pairs from a custom extension of FileInputFormat:
+ * PTable<KeyWritable, ValueWritable> custom = pipeline.read(From.formattedFile(
+ * "/custom", MyFileInputFormat.class, KeyWritable.class, ValueWritable.class));
+ * </code>
+ * </p>
*/
public class From {
/**
* Creates a {@code TableSource<K, V>} for reading data from files that have custom
- * {@code FileInputFormat} implementations not covered by the provided {@code TableSource}
+ * {@code FileInputFormat<K, V>} implementations not covered by the provided {@code TableSource}
* and {@code Source} factory methods.
*
* @param pathName The name of the path to the data on the filesystem
@@ -52,14 +77,14 @@ public class From {
* @return A new {@code TableSource<K, V>} instance
*/
public static <K extends Writable, V extends Writable> TableSource<K, V> formattedFile(
- String pathName, Class<? extends FileInputFormat> formatClass,
+ String pathName, Class<? extends FileInputFormat<K, V>> formatClass,
Class<K> keyClass, Class<V> valueClass) {
return formattedFile(new Path(pathName), formatClass, keyClass, valueClass);
}
/**
* Creates a {@code TableSource<K, V>} for reading data from files that have custom
- * {@code FileInputFormat} implementations not covered by the provided {@code TableSource}
+ * {@code FileInputFormat<K, V>} implementations not covered by the provided {@code TableSource}
* and {@code Source} factory methods.
*
* @param The {@code Path} to the data
@@ -69,7 +94,7 @@ public class From {
* @return A new {@code TableSource<K, V>} instance
*/
public static <K extends Writable, V extends Writable> TableSource<K, V> formattedFile(
- Path path, Class<? extends FileInputFormat> formatClass,
+ Path path, Class<? extends FileInputFormat<K, V>> formatClass,
Class<K> keyClass, Class<V> valueClass) {
return formattedFile(path, formatClass, Writables.writables(keyClass),
Writables.writables(valueClass));
@@ -86,7 +111,8 @@ public class From {
* @param valueType The {@code PType} to use for the value
* @return A new {@code TableSource<K, V>} instance
*/
- public static <K, V> TableSource<K, V> formattedFile(String pathName, Class<? extends FileInputFormat> formatClass,
+ public static <K, V> TableSource<K, V> formattedFile(String pathName,
+ Class<? extends FileInputFormat<?, ?>> formatClass,
PType<K> keyType, PType<V> valueType) {
return formattedFile(new Path(pathName), formatClass, keyType, valueType);
}
@@ -102,7 +128,8 @@ public class From {
* @param valueType The {@code PType} to use for the value
* @return A new {@code TableSource<K, V>} instance
*/
- public static <K, V> TableSource<K, V> formattedFile(Path path, Class<? extends FileInputFormat> formatClass,
+ public static <K, V> TableSource<K, V> formattedFile(Path path,
+ Class<? extends FileInputFormat<?, ?>> formatClass,
PType<K> keyType, PType<V> valueType) {
PTableType<K, V> tableType = keyType.getFamily().tableOf(keyType, valueType);
return new FileTableSourceImpl<K, V>(path, tableType, formatClass);
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/06ef56e6/crunch/src/main/java/org/apache/crunch/io/To.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/To.java b/crunch/src/main/java/org/apache/crunch/io/To.java
index d7af01b..d62d294 100644
--- a/crunch/src/main/java/org/apache/crunch/io/To.java
+++ b/crunch/src/main/java/org/apache/crunch/io/To.java
@@ -23,10 +23,39 @@ import org.apache.crunch.io.impl.FileTargetImpl;
import org.apache.crunch.io.seq.SeqFileTarget;
import org.apache.crunch.io.text.TextFileTarget;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
- * Static factory methods for creating common {@link Target} types.
+ * <p>Static factory methods for creating common {@link Target} types.</p>
+ *
+ * <p>The {@code To} class is intended to be used as part of a literate API
+ * for writing the output of Crunch pipelines to common file types. We can use
+ * the {@code Target} objects created by the factory methods in the {@code To}
+ * class with either the {@code write} method on the {@code Pipeline} class or
+ * the convenience {@code write} method on {@code PCollection} and {@code PTable}
+ * instances.
+ *
+ * <code>
+ * Pipeline pipeline = new MRPipeline(this.getClass());
+ * ...
+ * // Write a PCollection<String> to a text file:
+ * PCollection<String> words = ...;
+ * pipeline.write(words, To.textFile("/put/my/words/here"));
+ *
+ * // Write a PTable<Text, Text> to a sequence file:
+ * PTable<Text, Text> textToText = ...;
+ * textToText.write(To.sequenceFile("/words/to/words"));
+ *
+ * // Write a PCollection<MyAvroObject> to an Avro data file:
+ * PCollection<MyAvroObject> objects = ...;
+ * objects.write(To.avroFile("/my/avro/files"));
+ *
+ * // Write a PTable to a custom FileOutputFormat:
+ * PTable<KeyWritable, ValueWritable> custom = ...;
+ * pipeline.write(custom, To.formattedFile("/custom", MyFileFormat.class));
+ * </code>
+ * </p>
*/
public class To {
@@ -35,10 +64,11 @@ public class To {
* a custom {@code FileOutputFormat}.
*
* @param pathName The name of the path to write the data to on the filesystem
- * @param formatClass The {@code FileOutputFormat} to write the data to
+ * @param formatClass The {@code FileOutputFormat<K, V>} to write the data to
* @return A new {@code Target} instance
*/
- public static Target formattedFile(String pathName, Class<? extends FileOutputFormat> formatClass) {
+ public static <K extends Writable, V extends Writable> Target formattedFile(
+ String pathName, Class<? extends FileOutputFormat<K, V>> formatClass) {
return formattedFile(new Path(pathName), formatClass);
}
@@ -50,7 +80,8 @@ public class To {
* @param formatClass The {@code FileOutputFormat} to write the data to
* @return A new {@code Target} instance
*/
- public static Target formattedFile(Path path, Class<? extends FileOutputFormat> formatClass) {
+ public static <K extends Writable, V extends Writable> Target formattedFile(
+ Path path, Class<? extends FileOutputFormat<K, V>> formatClass) {
return new FileTargetImpl(path, formatClass, new SequentialFileNamingScheme());
}
@@ -119,5 +150,4 @@ public class To {
public static Target textFile(Path path) {
return new TextFileTarget(path);
}
-
}