You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ol...@apache.org on 2010/03/23 20:47:25 UTC
svn commit: r926752 [2/2] - in /hadoop/pig/trunk: ./
src/docs/src/documentation/content/xdocs/
Modified: hadoop/pig/trunk/src/docs/src/documentation/content/xdocs/zebra_mapreduce.xml
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/docs/src/documentation/content/xdocs/zebra_mapreduce.xml?rev=926752&r1=926751&r2=926752&view=diff
==============================================================================
--- hadoop/pig/trunk/src/docs/src/documentation/content/xdocs/zebra_mapreduce.xml (original)
+++ hadoop/pig/trunk/src/docs/src/documentation/content/xdocs/zebra_mapreduce.xml Tue Mar 23 19:47:24 2010
@@ -36,10 +36,11 @@
<!-- HADOOP M/R API-->
<section>
<title>Hadoop MapReduce APIs</title>
- <p>Zebra requires Hadoop 20. However, this release of Zebra only supports the "old" jobconf-style MapReduce APIs. </p>
+ <p>Zebra requires Hadoop 20. This release of Zebra supports the "new" jobContext-style MapReduce APIs. </p>
<ul>
- <li>"old" mapreduce API - org.apache.hadoop.mapred.* - supported</li>
- <li>"new" mapreduce API - org.apache.hadoop.mapreduce.* - not supported</li>
+ <li>org.apache.hadoop.mapreduce.* - supported ("new" jobContext-style mapreduce API)</li>
+ <li>org.apache.hadoop.mapred.* - supported, but deprecated ("old" jobConf-style mapreduce API)</li>
+
</ul>
<p></p>
</section>
@@ -48,203 +49,18 @@
<!-- ZEBRA API-->
<section>
<title>Zebra MapReduce APIs</title>
- <p>Zebra includes several classes for use in MapReduce programs. The main entry point into Zebra are the two classes for reading and writing tables, namely TableInputFormat and BasicTableOutputFormat. </p>
-
- <section>
- <title>BasicTableOutputFormat </title>
- <table>
- <tr><th>Static</th><th>Method</th><th>Description</th></tr>
- <tr>
- <td>yes</td>
- <td>void setOutputPath(JobConf, Path) </td>
- <td>Set the output path of the BasicTable in JobConf </td>
- </tr>
- <tr>
- <td>yes</td>
- <td>Path[] getOutputPaths(JobConf) </td>
- <td>Get the output paths of the BasicTable from JobConf </td>
- </tr>
- <tr>
- <td>yes</td>
- <td>void setStorageInfo(JobConf, ZebraSchema, ZebraStorageHint, ZebraSortInfo) </td>
- <td>Set the table storage information (schema, storagehint, sortinfo) in JobConf</td>
- </tr>
- <tr>
- <td>yes</td>
- <td>Schema getSchema(JobConf) </td>
- <td>Get the table schema in JobConf </td>
- </tr>
- <tr>
- <td>yes</td>
- <td>BytesWritable generateSortKey(JobConf, Tuple) </td>
- <td>Generates a BytesWritable key for the input key </td>
- </tr>
- <tr>
- <td>yes</td>
- <td>String getStorageHint(JobConf) </td>
- <td>Get the table storage hint in JobConf </td>
- </tr>
- <tr>
- <td>yes</td>
- <td>SortInfo getSortInfo(JobConf) </td>
- <td>Get the SortInfo object </td>
- </tr>
- <tr>
- <td>yes</td>
- <td>void close(JobConf) </td>
- <td>Close the output BasicTable, No more rows can be added into the table </td>
- </tr>
- <tr>
- <td>yes</td>
- <td>void setMultipleOutputs(JobConf, String commaSeparatedLocs, Class < extends ZebraOutputPartition> theClass) </td>
- <td>Enables data to be written to multiple zebra tables based on the ZebraOutputPartition class.
- See <a href="zebra_mapreduce.html#Multiple+Table+Outputs">Multiple Table Outputs.</a></td>
- </tr>
- </table>
- </section>
-
- <section>
- <title>TableInputFormat </title>
- <table>
- <tr><th>Static</th><th>Method</th><th>Description</th></tr>
- <tr>
- <td>yes</td>
- <td>void setInputPaths(JobConf, Path... paths) </td>
- <td>Set the paths to the input table </td>
-
- </tr>
- <tr>
- <td>yes</td>
- <td>Path[] getInputPaths(JobConf) </td>
- <td>Get the comma-separated paths to the input table or table union </td>
- </tr>
- <tr>
- <td>yes</td>
- <td>Schema getSchema(JobConf) </td>
- <td>Get the schema of a table expr </td>
- </tr>
- <tr>
- <td>yes</td>
- <td>void setProjection(JobConf, ZebraProjection) </td>
- <td>Set the input projection in the JobConf object </td>
- </tr>
- <tr>
- <td>yes</td>
- <td>String getProjection(JobConf) </td>
- <td>Get the projection from the JobConf </td>
- </tr>
- <tr>
- <td>yes</td>
- <td>SortInfo getSortInfo(JobConf) </td>
- <td>Get the SortInfo object regarding a Zebra table </td>
- </tr>
- <tr>
- <td>yes</td>
- <td>void requireSortedTable(JobConf, String sortcolumns, BytesComparator comparator) </td>
- <td>Requires sorted table or table union </td>
- </tr>
- <tr>
- <td> yes </td>
- <td>TableRecordReader getTableRecordReader(JobConf, ZebraProjection) </td>
- <td>Get a TableRecordReader on a single split </td>
- </tr>
- <tr>
- <td>yes</td>
- <td>void setMinSplitSize(JobConf, long minSize) </td>
- <td>Set the minimum split size, default of 1M bytes </td>
- </tr>
- </table>
- </section>
-
- <section>
- <title>TableRecordReader </title>
- <table>
- <tr><th>Static</th><th>Method</th><th>Description</th></tr>
- <tr>
- <td>no</td>
- <td>boolean seekTo(BytesWritable key) </td>
- <td>Seek to the position at the first row which has the key (returning true) or just after the key(returning false); only applicable for sorted Zebra table. </td>
- </tr>
- </table>
- </section>
-
-
-
- <section>
- <title>ZebraOutputPartition </title>
- <table>
- <tr><th>Static</th><th>Method</th><th>Description</th></tr>
- <tr>
- <td> no </td>
- <td>public abstract int getOutputPartition(BytesWritable key, Tuple value) </td>
- <td>Abstract method from ZebraOutputPartition abstract class. App implements this to stream data to different table </td>
- </tr>
- <tr>
- <td> no </td>
- <td>void setConf(Configuration jobConf) </td>
- <td>Initialization routine giving JobConf to application. Zebra implements it </td>
- </tr>
- <tr>
- <td> no </td>
- <td>Configuration getConf() </td>
- <td> returns JobConf. Zebra implements it</td>
- </tr>
- <tr>
- <td>yes </td>
- <td>Class< extends ZebraOutputPartition> getZebraOutputPartitionClass(JobConf conf) </td>
- <td>return user implemented ZebraOutputPartition class </td>
- </tr>
- </table>
- </section>
-
-
- <section>
- <title>ZebraProjection </title>
- <table>
- <tr><th>Static</th><th>Method</th><th>Description</th></tr>
- <tr>
- <td>yes</td>
- <td>ZebraProjection createZebraProjection(String) </td>
- <td>Create a ZebraProjection object from a string representing projection information. </td>
- </tr>
- </table>
- </section>
-
- <section>
- <title>ZebraSchema</title>
- <table>
- <tr><th>Static</th><th>Method</th><th>Description</th></tr>
- <tr>
- <td>yes</td>
- <td>ZebraSchema createZebraSchema(String) </td>
- <td>Create a ZebraStorageHint object from a string representing storage hint information.</td>
- </tr>
- </table>
- </section>
-
- <section>
- <title>ZebraStorageHint </title>
- <table>
- <tr><th>Static</th><th>Method</th><th>Description</th></tr>
- <tr>
- <td>yes</td>
- <td>ZebraStorageHint createZebraStorageHint(String) </td>
- <td>Create a ZebraStorageHint object from a string representing storage hint information. </td>
- </tr>
- </table>
- </section>
-
- <section>
- <title>ZebraSortInfo </title>
- <table>
- <tr><th>Static</th><th>Method</th><th>Description</th></tr>
- <tr>
- <td>yes</td>
- <td>ZebraSortInfo createZebraSortInfo(String sortColumns, Class< extends RawComparator < Object >> comparatorClass) </td>
- <td>Create a ZebraSortInfo object from a sort columns string and a comparator class. </td>
- </tr>
- </table>
- </section>
+ <p>Zebra includes several classes for use in MapReduce programs, located here (.....).</p>
+ <p>Please note these APIs. The main entry point into Zebra are the two classes for reading and writing tables, namely TableInputFormat and BasicTableOutputFormat. </p>
+ <ul>
+ <li>BasicTableOutputFormat</li>
+ <li>TableInputformat</li>
+ <li>TableRecordReader</li>
+ <li>ZebraOutputPartition</li>
+ <li>ZebraProjection</li>
+ <li>ZebraSchema</li>
+ <li>ZebraStorageHint</li>
+ <li>ZebraSortInfo</li>
+ </ul>
</section>
<!-- END ZEBRA API-->
@@ -262,7 +78,7 @@ The Zebra table in this example has two
The output format is specified as follows:</p>
<source>
-BasicTableOutputFormat.setStorageInfo(jobConf,
+BasicTableOutputFormat.setStorageInfo(jobContext,
ZebraSchema.createZebraSchema("word:string, count:int"),
ZebraStorageHint.createZebraStorageHint("[word];[count]"),
null);
@@ -286,7 +102,7 @@ The reducer sums the counts and produces
</p>
<source>
-package org.apache.hadoop.zebra.mapred;
+ package org.apache.hadoop.zebra.mapreduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
@@ -295,170 +111,185 @@ import org.apache.hadoop.io.BytesWritabl
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
-import org.apache.pig.data.Tuple;
+import org.apache.hadoop.zebra.mapreduce.BasicTableOutputFormat;
+import org.apache.hadoop.zebra.mapreduce.TableInputFormat;
import org.apache.hadoop.zebra.parser.ParseException;
import org.apache.hadoop.zebra.schema.Schema;
import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.pig.data.Tuple;
import java.io.IOException;
import java.util.Iterator;
-public class TableMapReduceExample extends Configured implements Tool {
-
- static class Map extends MapReduceBase implements Mapper<LongWritable, Text, BytesWritable, Tuple> {
- private BytesWritable bytesKey;
- private Tuple tupleRow;
-
- /**
- * Map method for reading input.
- */
- @Override
- public void map(LongWritable key, Text value,
- OutputCollector<BytesWritable, Tuple> output, Reporter reporter)
- throws IOException {
-
- // value should contain "word count"
- String[] wordCount = value.toString().split(" ");
- if (wordCount.length != 2) {
- // LOG the error
- throw new IOException("Value does not contain two fields:" + value);
- }
- byte[] word = wordCount[0].getBytes();
- bytesKey.set(word, 0, word.length);
- tupleRow.set(0, new String(word));
- tupleRow.set(1, Integer.parseInt(wordCount[1]));
- output.collect(bytesKey, tupleRow);
- }
-
- /**
- * Configuration of the job. Here we create an empty Tuple Row.
- */
- @Override
- public void configure(JobConf job) {
- bytesKey = new BytesWritable();
- try {
- Schema outSchema = BasicTableOutputFormat.getSchema(job);
- tupleRow = TypesUtils.createTuple(outSchema);
- } catch (IOException e) {
- throw new RuntimeException(e);
- } catch (ParseException e) {
- throw new RuntimeException(e);
- }
- }
-
- }
-
- static class ProjectionMap extends MapReduceBase implements Mapper<BytesWritable, Tuple, Text, IntWritable> {
- private final static Text all = new Text("All");
-
- /**
- * Map method which gets count column after projection.
- *
- * @throws IOException
- */
- @Override
- public void map(BytesWritable key, Tuple value,
- OutputCollector<Text, IntWritable> output, Reporter reporter)
- throws IOException {
- output.collect(all, new IntWritable((Integer) value.get(0)));
- }
- }
-
- public static class ProjectionReduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
- /**
- * Reduce method which implements summation. Acts as both reducer and combiner.
- *
- * @throws IOException
- */
- public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
- int sum = 0;
- while (values.hasNext()) {
- sum += values.next().get();
- }
- output.collect(key, new IntWritable(sum));
- }
- }
-
- /**
- * Where jobs and their settings and sequence is set.
- *
- * @param args arguments with exception of Tools understandable ones.
- */
- public int run(String[] args) throws Exception {
- if (args == null || args.length != 3) {
- System.out.println("usage: TableMapReduceExample input_path_for_text_file output_path_for_table output_path_for_text_file");
- System.exit(-1);
- }
-
- /*
- First MR Job creating a Table with two columns
- */
- JobConf jobConf = new JobConf();
- jobConf.setJobName("TableMapReduceExample");
- jobConf.set("table.output.tfile.compression", "none");
-
- // Input settings
- jobConf.setInputFormat(TextInputFormat.class);
- jobConf.setMapperClass(Map.class);
- FileInputFormat.setInputPaths(jobConf, new Path(args[0]));
-
- // Output settings
- jobConf.setOutputFormat(BasicTableOutputFormat.class);
- BasicTableOutputFormat.setOutputPath(jobConf, new Path(args[1]));
-
- // set the storage info of logical schema with 2 columns;
- // and create 2 physical column groups;
- // unsorted table
-
- BasicTableOutputFormat.setStorageInfo(jobConf,
- ZebraSchema.createZebraSchema("word:string, count:int"),
- ZebraStorageHint.createZebraStorageHint("[word];[count]"), null);
-
- // set map-only job.
- jobConf.setNumReduceTasks(0);
-
- // Run Job
- JobClient.runJob(jobConf);
-
- // Need to close Zebra output streams
- BasicTableOutputFormat.close(jobConf);
-
- /*
- Second MR Job for Table Projection of count column
- */
- JobConf projectionJobConf = new JobConf();
- projectionJobConf.setJobName("TableProjectionMapReduceExample");
-
- // Input settings
- projectionJobConf.setMapperClass(ProjectionMap.class);
- projectionJobConf.setInputFormat(TableInputFormat.class);
- TableInputFormat.setProjection(projectionJobConf, "count");
- TableInputFormat.setInputPaths(projectionJobConf, new Path(args[1]));
- projectionJobConf.setMapOutputKeyClass(Text.class);
- projectionJobConf.setMapOutputValueClass(IntWritable.class);
-
- // Output settings
- projectionJobConf.setOutputFormat(TextOutputFormat.class);
- FileOutputFormat.setOutputPath(projectionJobConf, new Path(args[2]));
- projectionJobConf.setReducerClass(ProjectionReduce.class);
- projectionJobConf.setCombinerClass(ProjectionReduce.class);
-
- // Run Job
- JobClient.runJob(projectionJobConf);
-
- return 0;
- }
+public class TableMapReduceExample extends Configured implements Tool {
- public static void main(String[] args) throws Exception {
- int res = ToolRunner.run(new Configuration(), new TableMapReduceExample(), args);
- System.exit(res);
- }
+ static class Map extends Mapper<LongWritable, Text, BytesWritable, Tuple> {
+ private BytesWritable bytesKey;
+ private Tuple tupleRow;
+
+ /**
+ * Map method for reading input.
+ */
+ @Override
+ public void map(LongWritable key, Text value, Context context)
+ throws IOException, InterruptedException {
+
+ // value should contain "word count"
+ String[] wordCount = value.toString().split(" ");
+ if (wordCount.length != 2) {
+ // LOG the error
+ throw new IOException("Value does not contain two fields:" + value);
+ }
+
+ byte[] word = wordCount[0].getBytes();
+ bytesKey.set(word, 0, word.length);
+ tupleRow.set(0, new String(word));
+ tupleRow.set(1, Integer.parseInt(wordCount[1]));
+
+ context.write( bytesKey, tupleRow );
+ }
+
+ /**
+ * Configuration of the job. Here we create an empty Tuple Row.
+ */
+ @Override
+ public void setup(Context context) {
+ bytesKey = new BytesWritable();
+ try {
+ Schema outSchema = BasicTableOutputFormat.getSchema( context );
+ tupleRow = TypesUtils.createTuple(outSchema);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } catch (ParseException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ }
+
+ static class ProjectionMap extends
+ Mapper<BytesWritable, Tuple, Text, IntWritable> {
+ private final static Text all = new Text("All");
+
+ /**
+ * Map method which gets count column after projection.
+ *
+ * @throws IOException
+ */
+ @Override
+ public void map(BytesWritable key, Tuple value, Context context)
+ throws IOException, InterruptedException {
+ context.write( all, new IntWritable((Integer) value.get(0)) );
+ }
+ }
+
+ public static class ProjectionReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
+ /**
+ * Reduce method which implements summation. Acts as both reducer and
+ * combiner.
+ *
+ * @throws IOException
+ */
+ @Override
+ public void reduce(Text key, Iterable<IntWritable> values, Context context)
+ throws IOException, InterruptedException {
+ int sum = 0;
+ Iterator<IntWritable> iterator = values.iterator();
+ while (iterator.hasNext()) {
+ sum += iterator.next().get();
+ }
+ context.write(key, new IntWritable(sum));
+ }
+ }
+
+ /**
+ * Where jobs and their settings and sequence is set.
+ *
+ * @param args
+ * arguments with exception of Tools understandable ones.
+ */
+ public int run(String[] args) throws Exception {
+ if (args == null || args.length != 3) {
+ System.out
+ .println("usage: TableMapReduceExample input_path_for_text_file output_path_for_table output_path_for_text_file");
+ System.exit(-1);
+ }
+
+ /*
+ * First MR Job creating a Table with two columns
+ */
+ Job job = new Job();
+ job.setJobName("TableMapReduceExample");
+ Configuration conf = job.getConfiguration();
+ conf.set("table.output.tfile.compression", "none");
+
+ // Input settings
+ job.setInputFormatClass(TextInputFormat.class);
+ job.setMapperClass(Map.class);
+ FileInputFormat.setInputPaths(job, new Path(args[0]));
+
+ // Output settings
+ job.setOutputFormatClass(BasicTableOutputFormat.class);
+ BasicTableOutputFormat.setOutputPath( job, new Path(args[1]) );
+
+ // set the logical schema with 2 columns
+ BasicTableOutputFormat.setSchema( job, "word:string, count:int" );
+
+ // for demo purposes, create 2 physical column groups
+ BasicTableOutputFormat.setStorageHint( job, "[word];[count]" );
+
+ // set map-only job.
+ job.setNumReduceTasks(0);
+
+ // Run Job
+ job.submit();
+
+ /*
+ * Second MR Job for Table Projection of count column
+ */
+ Job projectionJob = new Job();
+ projectionJob.setJobName("TableProjectionMapReduceExample");
+ conf = projectionJob.getConfiguration();
+
+ // Input settings
+ projectionJob.setMapperClass(ProjectionMap.class);
+ projectionJob.setInputFormatClass(TableInputFormat.class);
+ TableInputFormat.setProjection(job, "count");
+ TableInputFormat.setInputPaths(job, new Path(args[1]));
+ projectionJob.setMapOutputKeyClass(Text.class);
+ projectionJob.setMapOutputValueClass(IntWritable.class);
+
+ // Output settings
+ projectionJob.setOutputFormatClass(TextOutputFormat.class);
+ FileOutputFormat.setOutputPath(projectionJob, new Path(args[2]));
+ projectionJob.setReducerClass(ProjectionReduce.class);
+ projectionJob.setCombinerClass(ProjectionReduce.class);
+
+ // Run Job
+ projectionJob.submit();
+
+ return 0;
+ }
+
+ public static void main(String[] args) throws Exception {
+ int res = ToolRunner.run(new Configuration(), new TableMapReduceExample(),
+ args);
+ System.exit(res);
+ }
}
+
+
</source>
</section>
<!-- END ZEBRA TABLE OUTPUT EXAMPLE-->
@@ -475,94 +306,84 @@ the data as in the example above (word,
</p>
<source>
- package org.apache.hadoop.zebra.mapred;
+package org.apache.hadoop.zebra.mapreduce;
import java.io.IOException;
import java.util.List;
import java.util.ArrayList;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.zebra.mapred.BasicTableOutputFormat;
-import org.apache.hadoop.zebra.mapred.TableInputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.zebra.mapreduce.BasicTableOutputFormat;
+import org.apache.hadoop.zebra.mapreduce.TableInputFormat;
import org.apache.hadoop.zebra.parser.ParseException;
import org.apache.hadoop.zebra.schema.Schema;
import org.apache.hadoop.zebra.types.TypesUtils;
import org.apache.pig.data.Tuple;
-
public class TableMRSample2 {
- static class MapClass implements
- Mapper<BytesWritable, Tuple, BytesWritable, Tuple> {
- private BytesWritable bytesKey;
- private Tuple tupleRow;
-
- @Override
- public void map(BytesWritable key, Tuple value,
- OutputCollector<BytesWritable, Tuple> output, Reporter reporter)
- throws IOException
-
- {
- System.out.println(key.toString() + value.toString());
- output.collect(key, value);
- }
-
- @Override
- public void configure(JobConf job) {
- bytesKey = new BytesWritable();
- try {
- Schema outSchema = BasicTableOutputFormat.getSchema(job);
- tupleRow = TypesUtils.createTuple(outSchema);
- } catch (IOException e) {
- throw new RuntimeException(e);
- } catch (ParseException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public void close() throws IOException {
- // no-op
- }
-
- public static void main(String[] args) throws ParseException, IOException {
- JobConf jobConf = new JobConf();
- jobConf.setJobName("tableMRSample");
- jobConf.set("table.output.tfile.compression", "gz");
-
- // input settings
- jobConf.setInputFormat(TableInputFormat.class);
- jobConf.setOutputFormat(BasicTableOutputFormat.class);
- jobConf.setMapperClass(TableMRSample2.MapClass.class);
-
- List
- <Path> paths = new ArrayList<Path>(2);
- Path p = new Path("/user/mapredu/t1");
- System.out.println("path = " + p);
- paths.add(p);
- p = new Path("/user/mapredu/t2");
- paths.add(p);
-
- TableInputFormat.setInputPaths(jobConf, paths.toArray(new Path[2]));
- ZebraProjection zebraProjection = ZebraProjection.createZebraProjection("word");
- TableInputFormat.setProjection(jobConf, zebraProjection);
- BasicTableOutputFormat.setOutputPath(jobConf, new Path("/user/mapredu2/t1"));
-
- ZebraSchema zebraSchema = ZebraSchema.createZebraSchema("word:string");
- ZebraStorageHint zebraStorageHint = ZebraStorageHint.createZebraStorageHint("[word]");
- BasicTableOutputFormat.setStorageInfo(jobConf, zebraSchema, zebraStorageHint, null);
-
- // set map-only job.
- jobConf.setNumReduceTasks(0);
- jobConf.setNumMapTasks(2);
- JobClient.runJob(jobConf);
- }
- }
+ static class MapClass extends
+ Mapper<BytesWritable, Tuple, BytesWritable, Tuple> {
+ private BytesWritable bytesKey;
+ private Tuple tupleRow;
+
+ @Override
+ public void map(BytesWritable key, Tuple value, Context context)
+ throws IOException, InterruptedException {
+ System.out.println(key.toString() + value.toString());
+ context.write(key, value);
+ }
+
+ @Override
+ public void setup(Context context) {
+ bytesKey = new BytesWritable();
+ try {
+ Schema outSchema = BasicTableOutputFormat.getSchema(context);
+ tupleRow = TypesUtils.createTuple(outSchema);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } catch (ParseException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public static void main(String[] args) throws ParseException, IOException,
+ InterruptedException, ClassNotFoundException {
+ Job job = new Job();
+ job.setJobName("tableMRSample");
+ Configuration conf = job.getConfiguration();
+ conf.set("table.output.tfile.compression", "gz");
+
+ // input settings
+ job.setInputFormatClass(TableInputFormat.class);
+ job.setOutputFormatClass(BasicTableOutputFormat.class);
+ job.setMapperClass(TableMRSample2.MapClass.class);
+
+ List<Path> paths = new ArrayList<Path>(2);
+ Path p = new Path("/homes/chaow/mapredu/t1");
+ System.out.println("path = " + p);
+ paths.add(p);
+ p = new Path("/homes/chaow/mapredu/t2");
+ paths.add(p);
+
+ TableInputFormat.setInputPaths(job, paths.toArray(new Path[2]));
+ TableInputFormat.setProjection(job, "word");
+ BasicTableOutputFormat.setOutputPath(job, new Path(
+ "/homes/chaow/mapredu2/t1"));
+
+ BasicTableOutputFormat.setSchema(job, "word:string");
+ BasicTableOutputFormat.setStorageHint(job, "[word]");
+
+ // set map-only job.
+ job.setNumReduceTasks(0);
+ // TODO: need to find a replacement
+ //job.setNumMapTasks(2);
+ job.submit();
+ }
+ }
}
</source>
</section>
@@ -600,7 +421,7 @@ ZebraStorageHint zStorageHint = ZebraSto
ZebraSortInfo zSortInfo = ZebraSortInfo.createZebraSortInfo(sortColumnsString, MemcmpRawComparator.class);
-BasicTableOutputFormat.setStorageInfo(jobConf, zSchema, zStorageHint, zSortInfo);
+BasicTableOutputFormat.setStorageInfo(jobContext, zSchema, zStorageHint, zSortInfo);
</source>
</section>
<!-- END ZEBRA SORT EXAMPLE-->
@@ -769,9 +590,9 @@ In main()
String multiLocs = "/user/multi/us" + "," + "/user/multi/india" + "," + "/user/multi/japan";
- jobConf.setOutputFormat(BasicTableOutputFormat.class);
- BasicTableOutputFormat.setMultipleOutputPaths(jobConf, multiLocs);
- BasicTableOutputFormat.setZebraOutputPartitionClass(jobConf, MultipleOutputsTest.OutputPartitionerClass.class);
+ job.setOutputFormatClass(BasicTableOutputFormat.class);
+ BasicTableOutputFormat.setMultipleOutputPaths(job, multiLocs);
+ BasicTableOutputFormat.setZebraOutputPartitionClass(job, MultipleOutputsTest.OutputPartitionerClass.class);
Implement a partition class:
Modified: hadoop/pig/trunk/src/docs/src/documentation/content/xdocs/zebra_overview.xml
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/docs/src/documentation/content/xdocs/zebra_overview.xml?rev=926752&r1=926751&r2=926752&view=diff
==============================================================================
--- hadoop/pig/trunk/src/docs/src/documentation/content/xdocs/zebra_overview.xml (original)
+++ hadoop/pig/trunk/src/docs/src/documentation/content/xdocs/zebra_overview.xml Tue Mar 23 19:47:24 2010
@@ -42,7 +42,7 @@
<title>Prerequisites</title>
<p>Zebra requires:</p>
<ul>
- <li>Pig 0.6.0 or later</li>
+ <li>Pig 0.7.0 or later</li>
<li>Hadoop 0.20.1 or later</li>
</ul>
<p></p>
Modified: hadoop/pig/trunk/src/docs/src/documentation/content/xdocs/zebra_pig.xml
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/docs/src/documentation/content/xdocs/zebra_pig.xml?rev=926752&r1=926751&r2=926752&view=diff
==============================================================================
--- hadoop/pig/trunk/src/docs/src/documentation/content/xdocs/zebra_pig.xml (original)
+++ hadoop/pig/trunk/src/docs/src/documentation/content/xdocs/zebra_pig.xml Tue Mar 23 19:47:24 2010
@@ -165,7 +165,7 @@ register $LOCATION/zebra-$version.jar;
-- "s1:bool, s2:int, s3:long, s4:float, s5:string, s6:bytes,
-- r1:record(f1:int, f2:long), r2:record(r3:record(f3:float, f4)),
-- m1:map(string),m2:map(map(int)),
--- c:collection(f13:double, f14:float, f15:bytes)";
+-- c:collection (r1:record(f13:double, f14:float, f15:bytes))";
-- STR_STORAGE =
-- "[s1, s2]; [m1#{a}]; [r1.f1]; [s3, s4, r2.r3.f3];
Modified: hadoop/pig/trunk/src/docs/src/documentation/content/xdocs/zebra_reference.xml
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/docs/src/documentation/content/xdocs/zebra_reference.xml?rev=926752&r1=926751&r2=926752&view=diff
==============================================================================
--- hadoop/pig/trunk/src/docs/src/documentation/content/xdocs/zebra_reference.xml (original)
+++ hadoop/pig/trunk/src/docs/src/documentation/content/xdocs/zebra_reference.xml Tue Mar 23 19:47:24 2010
@@ -193,11 +193,11 @@ If the type is omitted, the field defaul
<title>Examples</title>
<p>In this example the schema specifies names and types for 3 columns.</p>
<source>
-ZebraSchema.createZebraSchema(jobconf, âs1:string, f1:float, i1:intâ);
+ZebraSchema.createZebraSchema(JobContext, âs1:string, f1:float, i1:intâ);
</source>
<p>In this example the schema specifies names for 3 columns; all 3 columns default to type bytes.</p>
<source>
-ZebraSchema.createZebraSchema(jobconf, âf1, f2, f3â);
+ZebraSchema.createZebraSchema(JobContext, âf1, f2, f3â);
</source>
</section>
@@ -248,11 +248,11 @@ ZebraSchema.createZebraSchema(jobconf, � <title>Examples</title>
<p>In this example the schema specifies a record with two fields. </p>
<source>
-ZebraSchema.createZebraSchema(jobconf, âr1:record(f1:int,f2:long)â);
+ZebraSchema.createZebraSchema(JobContext, âr1:record(f1:int,f2:long)â);
</source>
<p>In this example the schema specifies a record with two fields. Note that f2 will default to type bytes.</p>
<source>
-ZebraSchema.createZebraSchema(jobconf, âr1:record(r2:record(f1:int,f2)â);
+ZebraSchema.createZebraSchema(JobContext, âr1:record(r2:record(f1:int,f2)â);
</source>
</section>
</section>
@@ -264,7 +264,7 @@ ZebraSchema.createZebraSchema(jobconf, � <p>A collection is a set of records.</p>
<section>
<title>Syntax</title>
- <p>collection_alias:collection (record)</p>
+ <p>collection_alias:collection ([record_alias:]record(...))</p>
</section>
<section>
@@ -272,7 +272,7 @@ ZebraSchema.createZebraSchema(jobconf, � <table>
<tr>
<td>collection_alias</td>
- <td>The name assigned to the collection column.</td>
+ <td>The name assigned to the collection.</td>
</tr>
<tr>
<td>:collection</td>
@@ -282,28 +282,31 @@ ZebraSchema.createZebraSchema(jobconf, � <td>( )</td>
<td>The collection notation, a set of parentheses.</td>
</tr>
+ <tr>
+ <td>record_alias</td>
+ <td>The name assigned to the record.</td>
+ </tr>
<tr>
<td> record</td>
- <td>A record, specified one of two ways: <br></br>
-⢠Explicit (see Record)<br></br>
-c1:collection(r1:record(f1:int,f2:long))â);<br></br>
-⢠Implicit <br></br>
-c1:collection(f1:int,f2:long)â);
-</td>
+
+ <td> The record designator. The <a href="zebra_reference.html#Schemas+for+Records">record</a>
+ can be specified with or without the record alias:
+ <ul>
+ <li>c1:collection(r1:record(f1:int,f2:long));</li>
+ <li>c1:collection(record(f1:int,f2:long));</li>
+ </ul>
+ </td>
</tr>
</table>
</section>
<section>
<title>Examples</title>
- <p>In this example the schema specifies a collection with a record consisting of two fields (explicit record notation).</p>
+ <p>In this example the schema specifies a collection of records, each consisting of two fields.</p>
<source>
-ZebraSchema.createZebraSchema(jobconf, âc1:collection(r1:record(f1:int,f2:long))â);
+ZebraSchema.createZebraSchema(jobContext, âc1:collection(r1:record(f1:int,f2:long))â);
</source>
- <p>In this example the schema specifies a collection with a record consisting of two fields (implicit record notation).</p>
-<source>
-ZebraSchema.createZebraSchema(jobconf, âc1:collection(f1:int,f2:long)â);
-</source>
+
</section>
</section>
@@ -345,11 +348,11 @@ Note that the mapâs key is always t
<title>Examples</title>
<p>In this example the schema specifies a map with value of type string.</p>
<source>
-ZebraSchema.createZebraSchema(jobconf, âm1:map(string)â);
+ZebraSchema.createZebraSchema(jJobContext, âm1:map(string)â);
</source>
<p>In this example the schema specifies a map with value of type map (with a value of type int).</p>
<source>
-ZebraSchema.createZebraSchema(jobconf, âm2:map(map(int))â);
+ZebraSchema.createZebraSchema(JobContext, âm2:map(map(int))â);
</source>
</section>
</section>