You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ya...@apache.org on 2010/02/13 01:06:17 UTC
svn commit: r909667 [3/9] - in
/hadoop/pig/branches/load-store-redesign/contrib/zebra: ./
src/java/org/apache/hadoop/zebra/ src/java/org/apache/hadoop/zebra/io/
src/java/org/apache/hadoop/zebra/mapred/
src/java/org/apache/hadoop/zebra/mapreduce/ src/ja...
Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java?rev=909667&r1=909666&r2=909667&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java Sat Feb 13 00:06:15 2010
@@ -19,15 +19,11 @@
package org.apache.hadoop.zebra.pig;
import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.lang.reflect.Constructor;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
-import java.util.TreeMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -36,558 +32,385 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.zebra.io.BasicTable;
-import org.apache.hadoop.zebra.mapred.TableInputFormat;
-import org.apache.hadoop.zebra.mapred.TableRecordReader;
+import org.apache.hadoop.zebra.mapreduce.TableInputFormat;
+import org.apache.hadoop.zebra.mapreduce.TableRecordReader;
import org.apache.hadoop.zebra.parser.ParseException;
import org.apache.hadoop.zebra.schema.ColumnType;
+import org.apache.hadoop.zebra.schema.Schema;
import org.apache.hadoop.zebra.schema.Schema.ColumnSchema;
import org.apache.hadoop.zebra.types.Projection;
-import org.apache.hadoop.zebra.types.TypesUtils;
import org.apache.hadoop.zebra.types.SortInfo;
-import org.apache.pig.ExecType;
-import org.apache.pig.LoadFunc;
-import org.apache.pig.Slice;
-import org.apache.pig.Slicer;
-import org.apache.pig.backend.datastorage.DataStorage;
-import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
-import org.apache.pig.data.DataBag;
+import org.apache.pig.Expression;
+import org.apache.pig.LoadMetadata;
+import org.apache.pig.LoadPushDown;
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.ResourceStatistics;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
+import org.apache.pig.data.DefaultTupleFactory;
import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.logicalLayer.optimizer.PruneColumns;
-import org.apache.pig.impl.io.BufferedPositionedInputStream;
import org.apache.pig.impl.logicalLayer.FrontendException;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.util.UDFContext;
import org.apache.hadoop.zebra.pig.comparator.*;
import org.apache.pig.IndexableLoadFunc;
-import org.apache.hadoop.zebra.io.TableScanner;
/**
* Pig IndexableLoadFunc and Slicer for Zebra Table
*/
-public class TableLoader implements IndexableLoadFunc, Slicer {
- static final Log LOG = LogFactory.getLog(TableLoader.class);
- private TableInputFormat inputFormat;
- private JobConf jobConf;
- private String projectionString;
- private Path[] paths;
- private TableRecordReader indexReader = null;
- private BytesWritable indexKey = null;
- private Tuple tuple;
- private org.apache.hadoop.zebra.schema.Schema schema;
+public class TableLoader extends IndexableLoadFunc implements LoadMetadata, LoadPushDown {
+ static final Log LOG = LogFactory.getLog(TableLoader.class);
+
+ private static final String UDFCONTEXT_PROJ_STRING = "zebra.UDFContext.projectionString";
+
+ private String projectionString;
+
+ private TableRecordReader tableRecordReader = null;
+
+ private Schema schema;
private SortInfo sortInfo;
private boolean sorted = false;
- private org.apache.hadoop.zebra.schema.Schema projectionSchema;
- /**
- * default constructor
- */
- public TableLoader() {
- inputFormat = new TableInputFormat();
- }
-
- /**
- * @param projectionStr
- * projection string passed from pig query.
- */
- public TableLoader(String projectionStr) {
- inputFormat = new TableInputFormat();
- projectionString = projectionStr;
- }
-
- /**
- * @param projectionStr
- * projection string passed from pig query.
- * @param sorted
- * need sorted table(s)?
- */
- public TableLoader(String projectionStr, String sorted) throws IOException {
- inputFormat = new TableInputFormat();
- if (projectionStr != null && !projectionStr.isEmpty())
- projectionString = projectionStr;
- if (sorted.equalsIgnoreCase("sorted"))
- this.sorted = true;
- else
- throw new IOException("Invalid argument to the table loader constructor: "+sorted);
- }
-
- @Override
- public void initialize(Configuration conf) throws IOException
- {
- if (conf == null)
- throw new IOException("Null Configuration passed.");
- jobConf = new JobConf(conf);
- }
-
- @Override
- public void bindTo(String filePaths, BufferedPositionedInputStream is,
- long offset, long end) throws IOException {
-
- FileInputFormat.setInputPaths(jobConf, filePaths);
- Path[] paths = FileInputFormat.getInputPaths(jobConf);
- /**
- * Performing glob pattern matching
- */
- List<Path> result = new ArrayList<Path>(paths.length);
- for (Path p : paths) {
- FileSystem fs = p.getFileSystem(jobConf);
- FileStatus[] matches = fs.globStatus(p);
- if (matches == null) {
- LOG.warn("Input path does not exist: " + p);
- }
- else if (matches.length == 0) {
- LOG.warn("Input Pattern " + p + " matches 0 files");
- } else {
- for (FileStatus globStat: matches) {
- if (globStat.isDir()) {
- result.add(globStat.getPath());
- } else {
- LOG.warn("Input path " + p + " is not a directory");
- }
- }
- }
- }
- if (result.isEmpty()) {
- throw new IOException("No table specified for input");
- }
- TableInputFormat.setInputPaths(jobConf, result.toArray(new Path[result.size()]));
-
- TableInputFormat.requireSortedTable(jobConf, null);
- sortInfo = TableInputFormat.getSortInfo(jobConf);
- schema = TableInputFormat.getSchema(jobConf);
- int numcols = schema.getNumColumns();
- tuple = TypesUtils.createTuple(numcols);
- setProjection();
- /*
- * Use all columns of a table as a projection: not an optimal approach
- * No need to call TableInputFormat.setProjection: by default use all columns
- */
- try {
- indexReader = TableInputFormat.getTableRecordReader(jobConf, null);
- } catch (ParseException e) {
- throw new IOException("Exception from TableInputFormat.getTableRecordReader: "+e.getMessage());
- }
- indexKey = new BytesWritable();
+ private Schema projectionSchema;
+ private String udfContextSignature = null;
+
+ private Configuration conf = null;
+
+ private KeyGenerator keyGenerator = null;
+
+ /**
+ * default constructor
+ */
+ public TableLoader() {
}
- @Override
- public void seekNear(Tuple t) throws IOException
- {
- // SortInfo sortInfo = inputFormat.getSortInfo(conf, path);
- String[] sortColNames = sortInfo.getSortColumnNames();
- byte[] types = new byte[sortColNames.length];
- for(int i =0 ; i < sortColNames.length; ++i){
- types[i] = schema.getColumn(sortColNames[i]).getType().pigDataType();
- }
- KeyGenerator builder = makeKeyBuilder(types);
- BytesWritable key = builder.generateKey(t);
-// BytesWritable key = new BytesWritable(((String) t.get(0)).getBytes());
- indexReader.seekTo(key);
- }
-
- private KeyGenerator makeKeyBuilder(byte[] elems) {
- ComparatorExpr[] exprs = new ComparatorExpr[elems.length];
- for (int i = 0; i < elems.length; ++i) {
- exprs[i] = ExprUtils.primitiveComparator(i, elems[i]);
- }
- return new KeyGenerator(ExprUtils.tupleComparator(exprs));
- }
- /**
- * @param storage
- * @param location
- * The location format follows the same convention as
- * FileInputFormat's comma-separated multiple path format.
- * @throws IOException
- */
- private void checkConf(DataStorage storage, String location) throws IOException {
- if (jobConf == null) {
- Configuration conf =
- ConfigurationUtil.toConfiguration(storage.getConfiguration());
- jobConf = new JobConf(conf);
- jobConf.setInputFormat(TableInputFormat.class);
-
- // TODO: the following code may better be moved to TableInputFormat.
- // Hack: use FileInputFormat to decode comma-separated multiple path
- // format.
-
- FileInputFormat.setInputPaths(jobConf, location);
- paths = FileInputFormat.getInputPaths(jobConf);
-
- /**
- * Performing glob pattern matching
- */
- List<Path> result = new ArrayList<Path>(paths.length);
- for (Path p : paths) {
- FileSystem fs = p.getFileSystem(jobConf);
- FileStatus[] matches = fs.globStatus(p);
- if (matches == null) {
- throw new IOException("Input path does not exist: " + p);
- }
- else if (matches.length == 0) {
- LOG.warn("Input Pattern " + p + " matches 0 files");
- } else {
- for (FileStatus globStat: matches) {
- if (globStat.isDir()) {
- result.add(globStat.getPath());
- } else {
- LOG.warn("Input path " + p + " is not a directory");
- }
- }
- }
- }
- if (result.isEmpty()) {
- throw new IOException("No table specified for input");
- }
-
- LOG.info("Total input tables to process : " + result.size());
- TableInputFormat.setInputPaths(jobConf, result.toArray(new Path[result.size()]));
- if (sorted)
- TableInputFormat.requireSortedTable(jobConf, null);
- }
- }
-
-
- private void setProjection() throws IOException {
- try {
-
- String pigLoadSignature = jobConf.get("pig.loader.signature");
- Properties p = UDFContext.getUDFContext().getUDFProperties(this.getClass());
- String prunedProjStr = null;
- if( pigLoadSignature != null)
- prunedProjStr = p.getProperty(pigLoadSignature);
-
- if(prunedProjStr != null ) {
- TableInputFormat.setProjection(jobConf, prunedProjStr);
- } else {
- if (projectionString != null) {
- TableInputFormat.setProjection(jobConf, projectionString);
- }
- }
- } catch (ParseException e) {
- throw new IOException("Schema parsing failed : "+e.getMessage());
- }
-
-
-
- }
-
- @Override
- public Schema determineSchema(String fileName, ExecType execType,
- DataStorage storage) throws IOException {
-
- checkConf(storage, fileName);
-
- // This is bad but its done for pig. Pig creates one loadfunc object and uses to different
- // signatures. Zebra does not modify jobConf object once created. However, we might have the new
- // signature in this function everytime.
- String pigLoadSignature = storage.getConfiguration().getProperty("pig.loader.signature");
- if( pigLoadSignature != null) {
- jobConf.set("pig.loader.signature", pigLoadSignature);
- }
- setProjection();
-
- Projection projection;
-
- org.apache.hadoop.zebra.schema.Schema tschema = TableInputFormat.getSchema(jobConf);
- try {
- projection = new org.apache.hadoop.zebra.types.Projection(tschema, TableInputFormat.getProjection(jobConf));
- projectionSchema = projection.getProjectionSchema();
- } catch (ParseException e) {
- throw new IOException("Schema parsing failed : "+e.getMessage());
+ /**
+ * @param projectionStr
+ * projection string passed from pig query.
+ */
+ public TableLoader(String projectionStr) {
+ if( projectionStr != null && !projectionStr.isEmpty() )
+ projectionString = projectionStr;
}
- if (projectionSchema == null) {
- throw new IOException("Cannot determine table projection schema");
- }
-
- try {
- return SchemaConverter.toPigSchema(projectionSchema);
- } catch (FrontendException e) {
- throw new IOException("FrontendException", e);
- }
- }
-
- @Override
- public RequiredFieldResponse fieldsToRead(RequiredFieldList requiredFieldList) throws FrontendException {
-
-
- String pigLoadSignature = requiredFieldList.getSignature();
- if(pigLoadSignature == null) {
- throw new FrontendException("Zebra Cannot have null loader signature in fieldsToRead");
- }
-
- List<RequiredField> rFields = requiredFieldList.getFields();
- if( rFields == null) {
- throw new FrontendException("requiredFieldList.getFields() can not return null in fieldsToRead");
- }
-
- Iterator<RequiredField> it= rFields.iterator();
- String projectionStr = "";
-
- while( it.hasNext()) {
- RequiredField rField = (RequiredField) it.next();
- ColumnSchema cs = projectionSchema.getColumn(rField.getIndex());
-
- if(cs == null) {
- throw new FrontendException
- ("Null column schema in projection schema in fieldsToRead at index " + rField.getIndex());
- }
-
- if(cs.getType() != ColumnType.MAP && (rField.getSubFields() != null)) {
- throw new FrontendException
- ("Zebra cannot have subfields for a non-map column type in fieldsToRead " +
- "ColumnType:" + cs.getType() + " index in zebra projection schema: " + rField.getIndex()
- );
- }
- String name = cs.getName();
- projectionStr = projectionStr + name ;
- if(cs.getType() == ColumnType.MAP) {
- List<RequiredField> subFields = rField.getSubFields();
-
- if( subFields != null ) {
-
- Iterator<RequiredField> its= subFields.iterator();
- boolean flag = false;
- if(its.hasNext()) {
- flag = true;
- projectionStr += "#" + "{";
- }
- String tmp = "";
- while(its.hasNext()) {
- RequiredField sField = (RequiredField) its.next();
- tmp = tmp + sField.getAlias();
- if(its.hasNext()) {
- tmp = tmp + "|";
- }
- }
- if ( flag) {
- projectionStr = projectionStr + tmp + "}";
- }
- }
- }
- if(it.hasNext()) {
- projectionStr = projectionStr + " , ";
- }
- }
- Properties p = UDFContext.getUDFContext().getUDFProperties(this.getClass());
-
- if(p == null) {
- throw new FrontendException("Zebra Cannot have null UDFCOntext property");
- }
-
- if(projectionStr != null && (projectionStr != ""))
- p.setProperty(pigLoadSignature, projectionStr);
-
- RequiredFieldResponse rfr = new RequiredFieldResponse(true);
-
- return rfr;
- }
-
- @Override
- public Tuple getNext() throws IOException {
- if (indexReader.atEnd())
- return null;
- indexReader.next(indexKey, tuple);
- return tuple;
- }
-
- @Override
- public void close() throws IOException {
- if (indexReader != null)
- indexReader.close();
- }
-
- @Override
- public Slice[] slice(DataStorage store, String location) throws IOException {
-
- checkConf(store, location);
- setProjection();
- // TableInputFormat accepts numSplits < 0 (special case for no-hint)
- InputSplit[] splits = inputFormat.getSplits(jobConf, -1);
-
- Slice[] slices = new Slice[splits.length];
- for (int nx = 0; nx < slices.length; nx++) {
- slices[nx] = new TableSlice(jobConf, splits[nx], sorted);
- }
-
- return slices;
- }
-
- @Override
- public void validate(DataStorage store, String location) throws IOException {
- checkConf(store, location);
- }
-
- static class TableSlice implements Slice {
- private static final long serialVersionUID = 1L;
- private static final Class[] emptyArray = new Class[] {};
-
- private TreeMap<String, String> configMap;
- private InputSplit split;
-
- transient private JobConf conf;
- transient private int numProjCols = 0;
- transient private RecordReader<BytesWritable, Tuple> scanner;
- transient private BytesWritable key;
- transient private boolean sorted = false;
-
- TableSlice(JobConf conf, InputSplit split, boolean sorted) {
- // hack: expecting JobConf contains nothing but a <string, string>
- // key-value pair store.
- configMap = new TreeMap<String, String>();
- for (Iterator<Map.Entry<String, String>> it = conf.iterator(); it.hasNext();) {
- Map.Entry<String, String> e = it.next();
- configMap.put(e.getKey(), e.getValue());
- }
-
-
-
- this.split = split;
- this.sorted = sorted;
- }
-
- @Override
- public void close() throws IOException {
- if (scanner == null) {
- throw new IOException("Slice not initialized");
- }
- scanner.close();
- }
-
- @Override
- public long getLength() {
- try {
- return split.getLength();
- } catch (IOException e) {
- throw new RuntimeException("IOException", e);
- }
- }
-
- @Override
- public String[] getLocations() {
- try {
- return split.getLocations();
- } catch (IOException e) {
- throw new RuntimeException("IOException", e);
- }
- }
-
- @Override
- public long getPos() throws IOException {
- if (scanner == null) {
- throw new IOException("Slice not initialized");
- }
- return scanner.getPos();
- }
-
- @Override
- public float getProgress() throws IOException {
- if (scanner == null) {
- throw new IOException("Slice not initialized");
- }
- return scanner.getProgress();
- }
-
- @Override
- public long getStart() {
- return 0;
- }
-
- @Override
- public void init(DataStorage store) throws IOException {
- Configuration localConf = new Configuration();
- for (Iterator<Map.Entry<String, String>> it =
- configMap.entrySet().iterator(); it.hasNext();) {
- Map.Entry<String, String> e = it.next();
- localConf.set(e.getKey(), e.getValue());
- }
- conf = new JobConf(localConf);
- String projection;
- try
- {
- projection = TableInputFormat.getProjection(conf);
- } catch (ParseException e) {
- throw new IOException("Schema parsing failed :"+e.getMessage());
- }
- numProjCols = Projection.getNumColumns(projection);
- TableInputFormat inputFormat = new TableInputFormat();
- if (sorted)
- TableInputFormat.requireSortedTable(conf, null);
- scanner = inputFormat.getRecordReader(split, conf, Reporter.NULL);
- key = new BytesWritable();
- }
-
- @Override
- public boolean next(Tuple value) throws IOException {
-
- TypesUtils.formatTuple(value, numProjCols);
- return scanner.next(key, value);
- }
-
- private void writeObject(ObjectOutputStream out) throws IOException {
- out.writeObject(configMap);
- out.writeObject(split.getClass().getName());
- split.write(out);
- }
+ /**
+ * @param projectionStr
+ * projection string passed from pig query.
+ * @param sorted
+ * need sorted table(s)?
+ */
+ public TableLoader(String projectionStr, String sorted) throws IOException {
+ this( projectionStr );
+
+ if( sorted.equalsIgnoreCase( "sorted" ) )
+ this.sorted = true;
+ else
+ throw new IOException( "Invalid argument to the table loader constructor: " + sorted );
+ }
+
+ @Override
+ public void initialize(Configuration conf) throws IOException {
+ // Here we do ugly workaround for the problem in pig. the passed in parameter conf contains
+ // value for TableInputFormat.INPUT_PROJ that was set by left table execution in a merge join
+ // case. Here, we try to get rid of the side effect and copy everything expect that entry.
+ this.conf = new Configuration( false );
+ Iterator<Map.Entry<String, String>> it = conf.iterator();
+ while( it.hasNext() ) {
+ Map.Entry<String, String> entry = it.next();
+ String key = entry.getKey();
+ if( key.equals( "mapreduce.lib.table.input.projection" ) ) // The string const is defined in TableInputFormat.
+ continue;
+ this.conf.set( entry.getKey(), entry.getValue() );
+ }
+
+ tableRecordReader = createIndexReader();
+
+ String[] sortColNames = sortInfo.getSortColumnNames();
+ byte[] types = new byte[sortColNames.length];
+ for(int i =0 ; i < sortColNames.length; ++i){
+ types[i] = schema.getColumn(sortColNames[i]).getType().pigDataType();
+ }
+ keyGenerator = makeKeyBuilder( types );
+ }
+
+ /**
+ * This method is called only once.
+ */
+ @Override
+ public void seekNear(Tuple tuple) throws IOException {
+ BytesWritable key = keyGenerator.generateKey( tuple );
+ tableRecordReader.seekTo( key );
+ }
- @SuppressWarnings("unchecked")
- private void readObject(ObjectInputStream in) throws IOException,
- ClassNotFoundException {
- configMap = (TreeMap<String, String>) in.readObject();
- String className = (String) in.readObject();
- Class<InputSplit> clazz = (Class<InputSplit>) Class.forName(className);
- try {
- Constructor<InputSplit> meth = clazz.getDeclaredConstructor(emptyArray);
- meth.setAccessible(true);
- split = meth.newInstance();
- } catch (Exception e) {
- throw new ClassNotFoundException("Cannot create instance", e);
- }
- split.readFields(in);
- }
- }
-
- @Override
- public DataBag bytesToBag(byte[] b) throws IOException {
- throw new IOException("Not implemented");
- }
-
- @Override
- public String bytesToCharArray(byte[] b) throws IOException {
- throw new IOException("Not implemented");
- }
-
- @Override
- public Double bytesToDouble(byte[] b) throws IOException {
- throw new IOException("Not implemented");
- }
-
- @Override
- public Float bytesToFloat(byte[] b) throws IOException {
- throw new IOException("Not implemented");
- }
-
- @Override
- public Integer bytesToInteger(byte[] b) throws IOException {
- throw new IOException("Not implemented");
- }
-
- @Override
- public Long bytesToLong(byte[] b) throws IOException {
- throw new IOException("Not implemented");
- }
-
- public Map<String, Object> bytesToMap(byte[] b) throws IOException {
- throw new IOException("Not implemented");
- }
-
- @Override
- public Tuple bytesToTuple(byte[] b) throws IOException {
- throw new IOException("Not implemented");
- }
+ private TableRecordReader createIndexReader() throws IOException {
+ Job job = new Job( conf );
+
+ // Obtain the schema and sort info. for index reader, the table must be sorted.
+ schema = TableInputFormat.getSchema( job );
+ sorted = true;
+
+ setProjection( job );
+
+ try {
+ return TableInputFormat.createTableRecordReader( job, TableInputFormat.getProjection( job ) );
+ } catch(ParseException ex) {
+ throw new IOException( "Exception from TableInputFormat.getTableRecordReader: "+ ex.getMessage() );
+ } catch(InterruptedException ex){
+ throw new IOException( "Exception from TableInputFormat.getTableRecordReader: " + ex.getMessage() );
+ }
+ }
+
+ /**
+ * This method does more than set projection. For instance, it also try to grab sorting info if required.
+ *
+ * @param job
+ * @throws IOException
+ */
+ private void setProjection(Job job) throws IOException {
+ if( sorted ) {
+ TableInputFormat.requireSortedTable( job, null );
+ sortInfo = TableInputFormat.getSortInfo( job );
+ }
+
+ try {
+ Properties properties = UDFContext.getUDFContext().getUDFProperties(
+ this.getClass(), new String[]{ udfContextSignature } );
+ String prunedProjStr = properties.getProperty( UDFCONTEXT_PROJ_STRING );
+
+ if( prunedProjStr != null ) {
+ TableInputFormat.setProjection( job, prunedProjStr );
+ } else if( projectionString != null ) {
+ TableInputFormat.setProjection( job, projectionString );
+ }
+ } catch (ParseException ex) {
+ throw new IOException( "Schema parsing failed : " + ex.getMessage() );
+ }
+ }
+
+ private KeyGenerator makeKeyBuilder(byte[] elems) {
+ ComparatorExpr[] exprs = new ComparatorExpr[elems.length];
+ for (int i = 0; i < elems.length; ++i) {
+ exprs[i] = ExprUtils.primitiveComparator(i, elems[i]);
+ }
+ return new KeyGenerator(ExprUtils.tupleComparator(exprs));
+ }
+
+ /*
+ * Hack: use FileInputFormat to decode comma-separated multiple path format.
+ */
+ private static Path[] getPathsFromLocation(String location, Configuration conf) throws IOException {
+ Job j = new Job( conf );
+ FileInputFormat.setInputPaths( j, location );
+ Path[] paths = FileInputFormat.getInputPaths( j );
+
+ /**
+ * Performing glob pattern matching
+ */
+ List<Path> result = new ArrayList<Path>(paths.length);
+ for (Path p : paths) {
+ FileSystem fs = p.getFileSystem(conf);
+ FileStatus[] matches = fs.globStatus(p);
+ if( matches == null ) {
+ throw new IOException("Input path does not exist: " + p);
+ } else if (matches.length == 0) {
+ LOG.warn("Input Pattern " + p + " matches 0 files");
+ } else {
+ for (FileStatus globStat: matches) {
+ if (globStat.isDir()) {
+ result.add(globStat.getPath());
+ } else {
+ LOG.warn("Input path " + p + " is not a directory");
+ }
+ }
+ }
+ }
+
+ if (result.isEmpty()) {
+ throw new IOException("No table specified for input");
+ }
+
+ LOG.info("Total input tables to process : " + result.size());
+
+ return result.toArray( new Path[result.size()] );
+ }
+
+ @Override
+ public Tuple getNext() throws IOException {
+ if (tableRecordReader.atEnd())
+ return null;
+ try {
+ tableRecordReader.nextKeyValue();
+ ArrayList<Object> fields = new ArrayList<Object>(tableRecordReader.getCurrentValue().getAll());
+ return DefaultTupleFactory.getInstance().newTuple(fields);
+ } catch (InterruptedException ex) {
+ throw new IOException( "InterruptedException:" + ex );
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (tableRecordReader != null)
+ tableRecordReader.close();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void prepareToRead(org.apache.hadoop.mapreduce.RecordReader reader, PigSplit split)
+ throws IOException {
+ tableRecordReader = (TableRecordReader)reader;
+ if( tableRecordReader == null )
+ throw new IOException( "Invalid object type passed to TableLoader" );
+ }
+
+ @Override
+ public void setLocation(String location, Job job) throws IOException {
+ Path[] paths = getPathsFromLocation( location, job.getConfiguration() );
+ TableInputFormat.setInputPaths( job, paths );
+
+ // The following obviously goes beyond of set location, but this is the only place that we
+ // can do and it's suggested by Pig team.
+ setProjection( job );
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public InputFormat getInputFormat() throws IOException {
+ return new TableInputFormat();
+ }
+
+ @Override
+ public String[] getPartitionKeys(String location, Configuration conf)
+ throws IOException {
+ return null;
+ }
+
+ @Override
+ public ResourceSchema getSchema(String location, Configuration conf) throws IOException {
+ Path[] paths = getPathsFromLocation( location, conf );
+ Job job = new Job(conf);
+ TableInputFormat.setInputPaths( job, paths );
+
+ Schema tableSchema = null;
+ if( paths.length == 1 ) {
+ tableSchema = BasicTable.Reader.getSchema( paths[0], job.getConfiguration() );
+ } else {
+ tableSchema = new Schema();
+ for (Path p : paths) {
+ Schema schema = BasicTable.Reader.getSchema( p, job.getConfiguration() );
+ try {
+ tableSchema.unionSchema( schema );
+ } catch (ParseException e) {
+ throw new IOException(e.getMessage());
+ }
+ }
+ }
+
+ setProjection( job );
+
+ projectionSchema = tableSchema;
+ try {
+ Projection projection = new org.apache.hadoop.zebra.types.Projection( tableSchema,
+ TableInputFormat.getProjection( job ) );
+ projectionSchema = projection.getProjectionSchema();
+ } catch (ParseException e) {
+ throw new IOException( "Schema parsing failed : "+ e.getMessage() );
+ }
+
+ if( projectionSchema == null ) {
+ throw new IOException( "Cannot determine table projection schema" );
+ }
+
+ return SchemaConverter.convertToResourceSchema( projectionSchema );
+ }
+
+ @Override
+ public ResourceStatistics getStatistics(String location, Configuration conf)
+ throws IOException {
+ // Statistics is not supported.
+ return null;
+ }
+
+ @Override
+ public void setPartitionFilter(Expression partitionFilter)
+ throws IOException {
+ // no-op. It should not be ever called since getPartitionKeys returns null.
+ }
+
+ @Override
+ public List<OperatorSet> getFeatures() {
+ List<OperatorSet> features = new ArrayList<OperatorSet>(1);
+ features.add( LoadPushDown.OperatorSet.PROJECTION );
+ return features;
+ }
+
+ @Override
+ public RequiredFieldResponse pushProjection(RequiredFieldList requiredFieldList)
+ throws FrontendException {
+ List<RequiredField> rFields = requiredFieldList.getFields();
+ if( rFields == null) {
+ throw new FrontendException("requiredFieldList.getFields() can not return null in fieldsToRead");
+ }
+
+ String projectionStr = "";
+
+ Iterator<RequiredField> it= rFields.iterator();
+ while( it.hasNext() ) {
+ RequiredField rField = (RequiredField) it.next();
+ ColumnSchema cs = projectionSchema.getColumn(rField.getIndex());
+
+ if(cs == null) {
+ throw new FrontendException
+ ("Null column schema in projection schema in fieldsToRead at index " + rField.getIndex());
+ }
+
+ if(cs.getType() != ColumnType.MAP && (rField.getSubFields() != null)) {
+ throw new FrontendException
+ ("Zebra cannot have subfields for a non-map column type in fieldsToRead " +
+ "ColumnType:" + cs.getType() + " index in zebra projection schema: " + rField.getIndex()
+ );
+ }
+ String name = cs.getName();
+ projectionStr = projectionStr + name ;
+ if(cs.getType() == ColumnType.MAP) {
+ List<RequiredField> subFields = rField.getSubFields();
+
+ if( subFields != null ) {
+
+ Iterator<RequiredField> its= subFields.iterator();
+ boolean flag = false;
+ if(its.hasNext()) {
+ flag = true;
+ projectionStr += "#" + "{";
+ }
+ String tmp = "";
+ while(its.hasNext()) {
+ RequiredField sField = (RequiredField) its.next();
+ tmp = tmp + sField.getAlias();
+ if(its.hasNext()) {
+ tmp = tmp + "|";
+ }
+ }
+ if( flag ) {
+ projectionStr = projectionStr + tmp + "}";
+ }
+ }
+ }
+ if(it.hasNext()) {
+ projectionStr = projectionStr + " , ";
+ }
+ }
+
+ Properties properties = UDFContext.getUDFContext().getUDFProperties(
+ this.getClass(), new String[]{ udfContextSignature } );
+ if( projectionStr != null && !projectionStr.isEmpty() )
+ properties.setProperty( UDFCONTEXT_PROJ_STRING, projectionStr );
+
+ return new RequiredFieldResponse( true );
+ }
+
+ @Override
+ public void setUDFContextSignature(String signature) {
+ udfContextSignature = signature;
+ }
}
Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableStorer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableStorer.java?rev=909667&r1=909666&r2=909667&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableStorer.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableStorer.java Sat Feb 13 00:06:15 2010
@@ -19,19 +19,17 @@
package org.apache.hadoop.zebra.pig;
import java.io.IOException;
-import java.io.OutputStream;
-import java.lang.reflect.Constructor;
-import java.util.List;
+import java.util.Properties;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapred.OutputFormat;
-import org.apache.hadoop.mapred.RecordWriter;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.zebra.pig.comparator.ComparatorExpr;
import org.apache.hadoop.zebra.pig.comparator.ExprUtils;
import org.apache.hadoop.zebra.pig.comparator.KeyGenerator;
@@ -40,66 +38,132 @@
import org.apache.hadoop.zebra.parser.ParseException;
import org.apache.hadoop.zebra.types.SortInfo;
import org.apache.hadoop.zebra.types.TypesUtils;
-import org.apache.pig.StoreConfig;
-import org.apache.pig.CommittableStoreFunc;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.ResourceStatistics;
+import org.apache.pig.StoreFunc;
+import org.apache.pig.StoreMetadata;
+import org.apache.pig.ResourceSchema.ResourceFieldSchema;
import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.util.UDFContext;
/**
- * Pig CommittableStoreFunc for Zebra Table
+ * Pig LoadFunc implementation for Zebra Table
*/
-public class TableStorer implements CommittableStoreFunc {
- private String storageHintString;
+public class TableStorer implements StoreFunc, StoreMetadata {
+ private static final String UDFCONTEXT_OUTPUT_SCHEMA = "zebra.UDFContext.outputSchema";
+ private static final String UDFCONTEXT_SORT_INFO = "zebra.UDFContext.sortInfo";
+
+ static final String OUTPUT_STORAGEHINT = "mapreduce.lib.table.output.storagehint";
+ static final String OUTPUT_SCHEMA = "mapreduce.lib.table.output.schema";
+ static final String OUTPUT_PATH = "mapreduce.lib.table.output.dir";
+ static final String SORT_INFO = "mapreduce.lib.table.sort.info";
+
+ private String storageHintString = null;
+ private String udfContextSignature = null;
+ private TableRecordWriter tableRecordWriter = null;
- public TableStorer() {
- }
+ public TableStorer() {
+ }
+
+ public TableStorer(String storageHintStr) throws ParseException, IOException {
+ storageHintString = storageHintStr;
+ }
+
+ @Override
+ public void putNext(Tuple tuple) throws IOException {
+ tableRecordWriter.write( null, tuple );
+ }
+
+ @Override
+ public void checkSchema(ResourceSchema schema) throws IOException {
+ // Get schemaStr and sortColumnNames from the given schema. In the process, we
+ // also validate the schema and sorting info.
+ ResourceSchema.ResourceFieldSchema[] fields = schema.getFields();
+ int[] index = schema.getSortKeys();
+ StringBuilder sortColumnNames = new StringBuilder();
+ for( int i = 0; i< index.length; i++ ) {
+ ResourceFieldSchema field = fields[index[i]];
+ String name = field.getName();
+ if( name == null )
+ throw new IOException("Zebra does not support column positional reference yet");
+ if( !org.apache.pig.data.DataType.isAtomic( field.getType() ) )
+ throw new IOException( "Field [" + name + "] is not of simple type as required for a sort column now." );
+ if( i > 0 )
+ sortColumnNames.append( "," );
+ sortColumnNames.append( name );
+ }
+
+ // Convert resource schema to zebra schema
+ org.apache.hadoop.zebra.schema.Schema zebraSchema;
+ try {
+ zebraSchema = SchemaConverter.convertFromResourceSchema( schema );
+ } catch (ParseException ex) {
+ throw new IOException("Exception thrown from SchemaConverter: " + ex.getMessage() );
+ }
+
+ Properties properties = UDFContext.getUDFContext().getUDFProperties(
+ this.getClass(), new String[]{ udfContextSignature } );
+ properties.setProperty( UDFCONTEXT_OUTPUT_SCHEMA, zebraSchema.toString() );
+ properties.setProperty( UDFCONTEXT_SORT_INFO, sortColumnNames.toString() );
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public org.apache.hadoop.mapreduce.OutputFormat getOutputFormat()
+ throws IOException {
+ return new TableOutputFormat();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void prepareToWrite(RecordWriter writer)
+ throws IOException {
+ tableRecordWriter = (TableRecordWriter)writer;
+ if( tableRecordWriter == null ) {
+ throw new IOException( "Invalid type of writer. Expected type: TableRecordWriter." );
+ }
+ }
- public TableStorer(String storageHintStr) throws ParseException, IOException {
- storageHintString = storageHintStr;
- }
-
- @Override
- public void bindTo(OutputStream os) throws IOException {
- // no op
- }
-
- @Override
- public void finish() throws IOException {
- // no op
- }
-
- @Override
- public void putNext(Tuple f) throws IOException {
- // no op
- }
-
- @Override
- public Class getStorePreparationClass() throws IOException {
- return TableOutputFormat.class;
- }
-
- public String getStorageHintString() {
- return storageHintString;
- }
-
- private static final Class[] emptyArray = new Class[] {};
-
- static public void main(String[] args) throws SecurityException, NoSuchMethodException {
- Constructor meth = TableOutputFormat.class.getDeclaredConstructor(emptyArray);
- }
-
- @Override
- public void commit(Configuration conf) throws IOException {
- try {
- JobConf job = new JobConf(conf);
- StoreConfig storeConfig = MapRedUtil.getStoreConfig(job);
- BasicTable.Writer write = new BasicTable.Writer(new Path(storeConfig.getLocation()), job);
- write.close();
- } catch (IOException ee) {
- throw ee;
+ @Override
+ public String relToAbsPathForStoreLocation(String location, Path curDir)
+ throws IOException {
+ return LoadFunc.getAbsolutePath( location, curDir );
}
- }
+
+ @Override
+ public void setStoreLocation(String location, Job job) throws IOException {
+ Configuration conf = job.getConfiguration();
+ conf.set( OUTPUT_STORAGEHINT, storageHintString );
+ conf.set( OUTPUT_PATH, location );
+
+ // Get schema string and sorting info from UDFContext and re-store them to
+ // job config.
+ Properties properties = UDFContext.getUDFContext().getUDFProperties(
+ this.getClass(), new String[]{ udfContextSignature } );
+ conf.set( OUTPUT_SCHEMA, properties.getProperty( UDFCONTEXT_OUTPUT_SCHEMA ) );
+ conf.set( SORT_INFO, properties.getProperty( UDFCONTEXT_SORT_INFO ) );
+ }
+
+ @Override
+ public void storeSchema(ResourceSchema schema, String location, Configuration conf)
+ throws IOException {
+ // no-op. We do close at cleanupJob().
+ BasicTable.Writer write = new BasicTable.Writer( new Path( location ), conf );
+ write.close();
+ }
+
+ @Override
+ public void setStoreFuncUDFContextSignature(String signature) {
+ udfContextSignature = signature;
+ }
+
+ @Override
+ public void storeStatistics(ResourceStatistics stats, String location,
+ Configuration conf) throws IOException {
+ // no-op
+ }
+
}
/**
@@ -107,58 +171,72 @@
* Table OutputFormat
*
*/
-class TableOutputFormat implements OutputFormat<BytesWritable, Tuple> {
- @Override
- public void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException {
- StoreConfig storeConfig = MapRedUtil.getStoreConfig(job);
- String location = storeConfig.getLocation(), schemaStr;
- Schema schema = storeConfig.getSchema();
- org.apache.pig.SortInfo pigSortInfo = storeConfig.getSortInfo();
-
- /* TODO
- * use a home-brewn comparator ??
- */
- String comparator = null;
- String sortColumnNames = null;
- if (pigSortInfo != null)
- {
- List<org.apache.pig.SortColInfo> sortColumns = pigSortInfo.getSortColInfoList();
- StringBuilder sb = new StringBuilder();
- if (sortColumns != null && sortColumns.size() >0)
- {
- org.apache.pig.SortColInfo sortColumn;
- String sortColumnName;
- for (int i = 0; i < sortColumns.size(); i++)
- {
- sortColumn = sortColumns.get(i);
- sortColumnName = sortColumn.getColName();
- if (sortColumnName == null)
- throw new IOException("Zebra does not support column positional reference yet");
- if (!org.apache.pig.data.DataType.isAtomic(schema.getField(sortColumnName).type))
- throw new IOException(schema.getField(sortColumnName).alias+" is not of simple type as required for a sort column now.");
- if (i > 0)
- sb.append(",");
- sb.append(sortColumnName);
- }
- sortColumnNames = sb.toString();
- }
- }
- try {
- schemaStr = SchemaConverter.fromPigSchema(schema).toString();
- } catch (ParseException e) {
- throw new IOException("Exception thrown from SchemaConverter: " + e.getMessage());
- }
- TableStorer storeFunc = (TableStorer)MapRedUtil.getStoreFunc(job);
- BasicTable.Writer writer = new BasicTable.Writer(new Path(location),
- schemaStr, storeFunc.getStorageHintString(), sortColumnNames, comparator, job);
- writer.finish();
- }
-
- @Override
- public RecordWriter<BytesWritable, Tuple> getRecordWriter(FileSystem ignored,
- JobConf job, String name, Progressable progress) throws IOException {
- return new TableRecordWriter(name, job);
- }
+class TableOutputFormat extends OutputFormat<BytesWritable, Tuple> {
+ @Override
+ public void checkOutputSpecs(JobContext job) throws IOException, InterruptedException {
+ Configuration conf = job.getConfiguration();
+ String location = conf.get( TableStorer.OUTPUT_PATH );
+ String schemaStr = conf.get( TableStorer.OUTPUT_SCHEMA );
+ String storageHint = conf.get( TableStorer.OUTPUT_STORAGEHINT );
+ String sortColumnNames = conf.get( TableStorer.SORT_INFO );
+
+ BasicTable.Writer writer = new BasicTable.Writer( new Path( location ),
+ schemaStr, storageHint, sortColumnNames, null, conf );
+ writer.finish();
+ }
+
+ @Override
+ public OutputCommitter getOutputCommitter(TaskAttemptContext taContext)
+ throws IOException, InterruptedException {
+ return new TableOutputCommitter() ;
+ }
+
+ @Override
+ public org.apache.hadoop.mapreduce.RecordWriter<BytesWritable, Tuple> getRecordWriter(
+ TaskAttemptContext taContext) throws IOException, InterruptedException {
+ return new TableRecordWriter( taContext );
+ }
+
+}
+
+// TODO: make corresponding changes for commit and cleanup. Currently, no-ops.
+class TableOutputCommitter extends OutputCommitter {
+ @Override
+ public void abortTask(TaskAttemptContext taContext) throws IOException {
+ // TODO Auto-generated method stub
+ }
+
+ @Override
+ public void cleanupJob(JobContext jobContext) throws IOException {
+// Configuration conf = jobContext.getConfiguration();
+// String location = conf.get( TableStorer.OUTPUT_PATH );
+// BasicTable.Writer write = new BasicTable.Writer( new Path( location ), conf );
+// write.close();
+ }
+
+ @Override
+ public void commitTask(TaskAttemptContext taContext) throws IOException {
+ int i = 0;
+ i++;
+ // TODO Auto-generated method stub
+ }
+
+ @Override
+ public boolean needsTaskCommit(TaskAttemptContext taContext) throws IOException {
+ return false;
+ }
+
+ @Override
+ public void setupJob(JobContext jobContext) throws IOException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void setupTask(TaskAttemptContext taContext) throws IOException {
+ // TODO Auto-generated method stub
+ }
+
}
/**
@@ -166,65 +244,64 @@
* Table RecordWriter
*
*/
-class TableRecordWriter implements RecordWriter<BytesWritable, Tuple> {
- final private BytesWritable KEY0 = new BytesWritable(new byte[0]);
- private BasicTable.Writer writer;
- private TableInserter inserter;
- private int[] sortColIndices = null;
- KeyGenerator builder;
- Tuple t;
-
- public TableRecordWriter(String name, JobConf conf) throws IOException {
- StoreConfig storeConfig = MapRedUtil.getStoreConfig(conf);
- String location = storeConfig.getLocation();
-
- // TODO: how to get? 1) column group splits; 2) flag of sorted-ness,
- // compression, etc.
- writer = new BasicTable.Writer(new Path(location), conf);
-
- if (writer.getSortInfo() != null)
- {
- sortColIndices = writer.getSortInfo().getSortIndices();
- SortInfo sortInfo = writer.getSortInfo();
- String[] sortColNames = sortInfo.getSortColumnNames();
- org.apache.hadoop.zebra.schema.Schema schema = writer.getSchema();
-
- byte[] types = new byte[sortColNames.length];
- for(int i =0 ; i < sortColNames.length; ++i){
- types[i] = schema.getColumn(sortColNames[i]).getType().pigDataType();
- }
- t = TypesUtils.createTuple(sortColNames.length);
- builder = makeKeyBuilder(types);
- }
-
- inserter = writer.getInserter(name, false);
- }
-
- @Override
- public void close(Reporter reporter) throws IOException {
- inserter.close();
- writer.finish();
- }
-
- private KeyGenerator makeKeyBuilder(byte[] elems) {
- ComparatorExpr[] exprs = new ComparatorExpr[elems.length];
- for (int i = 0; i < elems.length; ++i) {
- exprs[i] = ExprUtils.primitiveComparator(i, elems[i]);
- }
- return new KeyGenerator(ExprUtils.tupleComparator(exprs));
- }
-
- @Override
- public void write(BytesWritable key, Tuple value) throws IOException {
- if (sortColIndices != null)
- {
- for(int i =0; i < sortColIndices.length;++i) {
- t.set(i, value.get(sortColIndices[i]));
- }
- key = builder.generateKey(t);
- } else if (key == null) {
- key = KEY0;
+class TableRecordWriter extends RecordWriter<BytesWritable, Tuple> {
+ final private BytesWritable KEY0 = new BytesWritable(new byte[0]);
+ private BasicTable.Writer writer;
+ private TableInserter inserter;
+ private int[] sortColIndices = null;
+ KeyGenerator builder;
+ Tuple t;
+
+ public TableRecordWriter(TaskAttemptContext taContext) throws IOException {
+ Configuration conf = taContext.getConfiguration();
+
+ String path = conf.get(TableStorer.OUTPUT_PATH);
+ writer = new BasicTable.Writer( new Path( path ), conf );
+
+ if (writer.getSortInfo() != null)
+ {
+ sortColIndices = writer.getSortInfo().getSortIndices();
+ SortInfo sortInfo = writer.getSortInfo();
+ String[] sortColNames = sortInfo.getSortColumnNames();
+ org.apache.hadoop.zebra.schema.Schema schema = writer.getSchema();
+
+ byte[] types = new byte[sortColNames.length];
+ for(int i =0 ; i < sortColNames.length; ++i){
+ types[i] = schema.getColumn(sortColNames[i]).getType().pigDataType();
+ }
+ t = TypesUtils.createTuple(sortColNames.length);
+ builder = makeKeyBuilder(types);
+ }
+
+ inserter = writer.getInserter( "patition-" + taContext.getTaskAttemptID().getTaskID().getId(), false );
+ }
+
+ @Override
+ public void close(TaskAttemptContext taContext) throws IOException {
+ inserter.close();
+ writer.finish();
}
- inserter.insert(key, value);
- }
+
+ private KeyGenerator makeKeyBuilder(byte[] elems) {
+ ComparatorExpr[] exprs = new ComparatorExpr[elems.length];
+ for (int i = 0; i < elems.length; ++i) {
+ exprs[i] = ExprUtils.primitiveComparator(i, elems[i]);
+ }
+ return new KeyGenerator(ExprUtils.tupleComparator(exprs));
+ }
+
+ @Override
+ public void write(BytesWritable key, Tuple value) throws IOException {
+ System.out.println( "Tuple: " + value.toDelimitedString(",") );
+ if (sortColIndices != null) {
+ for(int i =0; i < sortColIndices.length;++i) {
+ t.set(i, value.get(sortColIndices[i]));
+ }
+ key = builder.generateKey(t);
+ } else if (key == null) {
+ key = KEY0;
+ }
+ inserter.insert(key, value);
+ }
+
}
Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/schema/Schema.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/schema/Schema.java?rev=909667&r1=909666&r2=909667&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/schema/Schema.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/schema/Schema.java Sat Feb 13 00:06:15 2010
@@ -776,8 +776,9 @@
keyentries.add(keys[j]);
}
}
+ } else {
+ result.add( new ColumnSchema(pn.mName, null, ColumnType.ANY ) );
}
- else result.add(null);
}
return result;
}
Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputs.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputs.java?rev=909667&r1=909666&r2=909667&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputs.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputs.java Sat Feb 13 00:06:15 2010
@@ -91,19 +91,18 @@
static String inputPath;
static String inputFileName = "multi-input.txt";
- protected static ExecType execType = ExecType.MAPREDUCE;
+ //protected static ExecType execType = ExecType.MAPREDUCE;
+ protected static ExecType execType = ExecType.LOCAL;
private static MiniCluster cluster;
protected static PigServer pigServer;
- // private static Path pathWorking, pathTable1, path2, path3,
- // pathTable4, pathTable5;
private static Configuration conf;
public static String sortKey = null;
private static FileSystem fs;
- private static String zebraJar;
- private static String whichCluster;
- private static String multiLocs;
+ private static String zebraJar = null;
+ private static String whichCluster = null;
+ private static String multiLocs = null;
private static String strTable1 = null;
private static String strTable2 = null;
private static String strTable3 = null;
@@ -112,52 +111,54 @@
public static void setUpOnce() throws IOException {
if (System.getenv("hadoop.log.dir") == null) {
String base = new File(".").getPath(); // getAbsolutePath();
- System
- .setProperty("hadoop.log.dir", new Path(base).toString() + "./logs");
+ System.setProperty("hadoop.log.dir", new Path(base).toString() + "./logs");
}
+ /* By default, we use miniCluster */
if (System.getProperty("whichCluster") == null) {
- System.setProperty("whichCluster", "realCluster");
- System.out.println("should be called");
- whichCluster = System.getProperty("whichCluster");
+ whichCluster = "miniCluster";
+ System.setProperty("whichCluster", "miniCluster");
} else {
whichCluster = System.getProperty("whichCluster");
}
- System.out.println("clusterddddd: " + whichCluster);
- System.out.println(" get env hadoop home: " + System.getenv("HADOOP_HOME"));
- System.out.println(" get env user name: " + System.getenv("USER"));
- if ((whichCluster.equalsIgnoreCase("realCluster") && System
- .getenv("HADOOP_HOME") == null)) {
- System.out.println("Please set HADOOP_HOME");
- System.exit(0);
- }
+ System.out.println("cluster: " + whichCluster);
+
+ if (whichCluster.equals("realCluster")) {
+ System.out.println(" get env hadoop home: " + System.getenv("HADOOP_HOME"));
+ System.out.println(" get env user name: " + System.getenv("USER"));
+
+ if (System.getenv("HADOOP_HOME") == null) {
+ System.out.println("Please set HADOOP_HOME for realCluster testing mode");
+ System.exit(0);
+ }
+
+ if (System.getenv("USER") == null) {
+ System.out.println("Please set USER for realCluster testing mode");
+ System.exit(0);
+ }
+
+ zebraJar = System.getenv("HADOOP_HOME") + "/lib/zebra.jar";
+
+ File file = new File(zebraJar);
+ if (!file.exists()) {
+ System.out.println("Please place zebra.jar at $HADOOP_HOME/lib");
+ System.exit(0);
+ }
+ }
conf = new Configuration();
-
- if ((whichCluster.equalsIgnoreCase("realCluster") && System.getenv("USER") == null)) {
- System.out.println("Please set USER");
- System.exit(0);
- }
- zebraJar = System.getenv("HADOOP_HOME") + "/lib/zebra.jar";
-
- File file = new File(zebraJar);
- if (!file.exists() && whichCluster.equalsIgnoreCase("realCluster")) {
- System.out.println("Please put zebra.jar at hadoop_home/lib");
- System.exit(0);
- }
-
+
// set inputPath and output path
String workingDir = null;
- if (whichCluster.equalsIgnoreCase("realCluster")) {
- inputPath = new String("/user/" + System.getenv("USER") + "/"
- + inputFileName);
+ if( whichCluster.equals("realCluster")) {
+ inputPath = new String("/user/" + System.getenv("USER") + "/" + inputFileName);
System.out.println("inputPath: " + inputPath);
multiLocs = new String("/user/" + System.getenv("USER") + "/" + "us"
+ "," + "/user/" + System.getenv("USER") + "/" + "india" + ","
+ "/user/" + System.getenv("USER") + "/" + "japan");
+
fs = new Path(inputPath).getFileSystem(conf);
-
} else {
RawLocalFileSystem rawLFS = new RawLocalFileSystem();
fs = new LocalFileSystem(rawLFS);
@@ -167,7 +168,9 @@
multiLocs = new String(workingDir + "/" + "us" + "," + workingDir + "/"
+ "india" + "," + workingDir + "/" + "japan");
}
+
writeToFile(inputPath);
+
// check inputPath existence
File inputFile = new File(inputPath);
if (!inputFile.exists() && whichCluster.equalsIgnoreCase("realCluster")) {
@@ -181,8 +184,8 @@
System.exit(0);
}
- if (whichCluster.equalsIgnoreCase("realCluster")) {
- pigServer = new PigServer(ExecType.MAPREDUCE, ConfigurationUtil
+ if (whichCluster.equals("realCluster")) {
+ pigServer = new PigServer(ExecType.MAPREDUCE, ConfigurationUtil
.toProperties(conf));
pigServer.registerJar(zebraJar);
@@ -231,35 +234,35 @@
public static void writeToFile (String inputFile) throws IOException{
if (whichCluster.equalsIgnoreCase("miniCluster")){
- FileWriter fstream = new FileWriter(inputFile);
- BufferedWriter out = new BufferedWriter(fstream);
- out.write("us 2\n");
- out.write("japan 2\n");
- out.write("india 4\n");
- out.write("us 2\n");
- out.write("japan 1\n");
- out.write("india 3\n");
- out.write("nouse 5\n");
- out.write("nowhere 4\n");
- out.close();
- }
- if (whichCluster.equalsIgnoreCase("realCluster")){
- FSDataOutputStream fout = fs.create(new Path (inputFile));
- fout.writeBytes("us 2\n");
- fout.writeBytes("japan 2\n");
- fout.writeBytes("india 4\n");
- fout.writeBytes("us 2\n");
- fout.writeBytes("japan 1\n");
- fout.writeBytes("india 3\n");
- fout.writeBytes("nouse 5\n");
- fout.writeBytes("nowhere 4\n");
- fout.close();
+ FileWriter fstream = new FileWriter(inputFile);
+ BufferedWriter out = new BufferedWriter(fstream);
+ out.write("us 2\n");
+ out.write("japan 2\n");
+ out.write("india 4\n");
+ out.write("us 2\n");
+ out.write("japan 1\n");
+ out.write("india 3\n");
+ out.write("nouse 5\n");
+ out.write("nowhere 4\n");
+ out.close();
+ } else if( whichCluster.equalsIgnoreCase("realCluster") ) {
+ FSDataOutputStream fout = fs.create(new Path (inputFile));
+ fout.writeBytes("us 2\n");
+ fout.writeBytes("japan 2\n");
+ fout.writeBytes("india 4\n");
+ fout.writeBytes("us 2\n");
+ fout.writeBytes("japan 1\n");
+ fout.writeBytes("india 3\n");
+ fout.writeBytes("nouse 5\n");
+ fout.writeBytes("nowhere 4\n");
+ fout.close();
}
}
public Path generateOutPath(String currentMethod) {
Path outPath = null;
- if (whichCluster.equalsIgnoreCase("realCluster")) {
+ if (whichCluster.equalsIgnoreCase("realCluster") ||
+ whichCluster.equalsIgnoreCase("miniCluster") ) {
outPath = new Path("/user/" + System.getenv("USER") + "/multiOutput/"
+ currentMethod);
} else {
@@ -272,7 +275,7 @@
public void removeDir(Path outPath) throws IOException {
String command = null;
- if (whichCluster.equalsIgnoreCase("realCluster")) {
+ if (whichCluster.equals("realCluster")) {
command = System.getenv("HADOOP_HOME") + "/bin/hadoop fs -rmr "
+ outPath.toString();
} else {
@@ -510,7 +513,7 @@
System.out.println("hello sort on word and count");
String methodName = getCurrentMethodName();
String myMultiLocs = null;
- if (whichCluster.equalsIgnoreCase("realCluster")) {
+ if (whichCluster.equalsIgnoreCase("realCluster") ) {
myMultiLocs = new String("/user/" + System.getenv("USER") + "/" + "us"
+ methodName + "," + "/user/" + System.getenv("USER") + "/" + "india"
+ methodName + "," + "/user/" + System.getenv("USER") + "/" + "japan"
Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputs2.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputs2.java?rev=909667&r1=909666&r2=909667&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputs2.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputs2.java Sat Feb 13 00:06:15 2010
@@ -90,7 +90,8 @@
static String inputPath;
static String inputFileName = "multi-input.txt";
- protected static ExecType execType = ExecType.MAPREDUCE;
+ //protected static ExecType execType = ExecType.MAPREDUCE;
+ protected static ExecType execType = ExecType.LOCAL;
private static MiniCluster cluster;
protected static PigServer pigServer;
// private static Path pathWorking, pathTable1, path2, path3,
@@ -116,7 +117,7 @@
}
if (System.getProperty("whichCluster") == null) {
- System.setProperty("whichCluster", "realCluster");
+ System.setProperty("whichCluster", "miniCluster");
System.out.println("should be called");
whichCluster = System.getProperty("whichCluster");
} else {
Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputs2TypedApi.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputs2TypedApi.java?rev=909667&r1=909666&r2=909667&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputs2TypedApi.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputs2TypedApi.java Sat Feb 13 00:06:15 2010
@@ -92,7 +92,8 @@
static String inputPath;
static String inputFileName = "multi-input.txt";
- protected static ExecType execType = ExecType.MAPREDUCE;
+ //protected static ExecType execType = ExecType.MAPREDUCE;
+ protected static ExecType execType = ExecType.LOCAL;
private static MiniCluster cluster;
protected static PigServer pigServer;
// private static Path pathWorking, pathTable1, path2, path3,
@@ -118,7 +119,7 @@
}
if (System.getProperty("whichCluster") == null) {
- System.setProperty("whichCluster", "realCluster");
+ System.setProperty("whichCluster", "miniCluster");
System.out.println("should be called");
whichCluster = System.getProperty("whichCluster");
} else {
Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputs3.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputs3.java?rev=909667&r1=909666&r2=909667&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputs3.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputs3.java Sat Feb 13 00:06:15 2010
@@ -90,7 +90,8 @@
static String inputPath;
static String inputFileName = "multi-input.txt";
- protected static ExecType execType = ExecType.MAPREDUCE;
+ //protected static ExecType execType = ExecType.MAPREDUCE;
+ protected static ExecType execType = ExecType.LOCAL;
private static MiniCluster cluster;
protected static PigServer pigServer;
// private static Path pathWorking, pathTable1, path2, path3,
@@ -116,7 +117,7 @@
}
if (System.getProperty("whichCluster") == null) {
- System.setProperty("whichCluster", "realCluster");
+ System.setProperty("whichCluster", "miniCluster");
System.out.println("should be called");
whichCluster = System.getProperty("whichCluster");
} else {
Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputs3TypedApi.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputs3TypedApi.java?rev=909667&r1=909666&r2=909667&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputs3TypedApi.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputs3TypedApi.java Sat Feb 13 00:06:15 2010
@@ -92,7 +92,8 @@
static String inputPath;
static String inputFileName = "multi-input.txt";
- protected static ExecType execType = ExecType.MAPREDUCE;
+ //protected static ExecType execType = ExecType.MAPREDUCE;
+ protected static ExecType execType = ExecType.LOCAL;
private static MiniCluster cluster;
protected static PigServer pigServer;
// private static Path pathWorking, pathTable1, path2, path3,
@@ -118,7 +119,7 @@
}
if (System.getProperty("whichCluster") == null) {
- System.setProperty("whichCluster", "realCluster");
+ System.setProperty("whichCluster", "miniCluster");
System.out.println("should be called");
whichCluster = System.getProperty("whichCluster");
} else {
Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputs4.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputs4.java?rev=909667&r1=909666&r2=909667&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputs4.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputs4.java Sat Feb 13 00:06:15 2010
@@ -90,7 +90,8 @@
static String inputPath;
static String inputFileName = "multi-input.txt";
- protected static ExecType execType = ExecType.MAPREDUCE;
+ //protected static ExecType execType = ExecType.MAPREDUCE;
+ protected static ExecType execType = ExecType.LOCAL;
private static MiniCluster cluster;
protected static PigServer pigServer;
// private static Path pathWorking, pathTable1, path2, path3,
@@ -116,7 +117,7 @@
}
if (System.getProperty("whichCluster") == null) {
- System.setProperty("whichCluster", "realCluster");
+ System.setProperty("whichCluster", "miniCluster");
System.out.println("should be called");
whichCluster = System.getProperty("whichCluster");
} else {
Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputs4TypedApi.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputs4TypedApi.java?rev=909667&r1=909666&r2=909667&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputs4TypedApi.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputs4TypedApi.java Sat Feb 13 00:06:15 2010
@@ -92,7 +92,7 @@
static String inputPath;
static String inputFileName = "multi-input.txt";
- protected static ExecType execType = ExecType.MAPREDUCE;
+ protected static ExecType execType = ExecType.LOCAL;
private static MiniCluster cluster;
protected static PigServer pigServer;
// private static Path pathWorking, pathTable1, path2, path3,
@@ -118,7 +118,7 @@
}
if (System.getProperty("whichCluster") == null) {
- System.setProperty("whichCluster", "realCluster");
+ System.setProperty("whichCluster", "miniCluster");
System.out.println("should be called");
whichCluster = System.getProperty("whichCluster");
} else {
Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputsTypeApi.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputsTypeApi.java?rev=909667&r1=909666&r2=909667&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputsTypeApi.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputsTypeApi.java Sat Feb 13 00:06:15 2010
@@ -93,7 +93,8 @@
static String inputPath;
static String inputFileName = "multi-input.txt";
- protected static ExecType execType = ExecType.MAPREDUCE;
+ //protected static ExecType execType = ExecType.MAPREDUCE;
+ protected static ExecType execType = ExecType.LOCAL;
private static MiniCluster cluster;
protected static PigServer pigServer;
// private static Path pathWorking, pathTable1, path2, path3,
@@ -119,7 +120,7 @@
}
if (System.getProperty("whichCluster") == null) {
- System.setProperty("whichCluster", "realCluster");
+ System.setProperty("whichCluster", "miniCluster");
System.out.println("should be called");
whichCluster = System.getProperty("whichCluster");
} else {
Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputsTypedApiNeg.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputsTypedApiNeg.java?rev=909667&r1=909666&r2=909667&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputsTypedApiNeg.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputsTypedApiNeg.java Sat Feb 13 00:06:15 2010
@@ -92,7 +92,8 @@
static String inputPath;
static String inputFileName = "multi-input.txt";
- protected static ExecType execType = ExecType.MAPREDUCE;
+ //protected static ExecType execType = ExecType.MAPREDUCE;
+ protected static ExecType execType = ExecType.LOCAL;
private static MiniCluster cluster;
protected static PigServer pigServer;
// private static Path pathWorking, pathTable1, path2, path3,
@@ -118,7 +119,7 @@
}
if (System.getProperty("whichCluster") == null) {
- System.setProperty("whichCluster", "realCluster");
+ System.setProperty("whichCluster", "miniCluster");
System.out.println("should be called");
whichCluster = System.getProperty("whichCluster");
} else {
Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestTypedApi.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestTypedApi.java?rev=909667&r1=909666&r2=909667&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestTypedApi.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestTypedApi.java Sat Feb 13 00:06:15 2010
@@ -103,7 +103,7 @@
static String inputPath;
static String inputFileName = "multi-input.txt";
- protected static ExecType execType = ExecType.MAPREDUCE;
+ protected static ExecType execType = ExecType.LOCAL;
private static MiniCluster cluster;
protected static PigServer pigServer;
// private static Path pathWorking, pathTable1, path2, path3,
@@ -129,7 +129,7 @@
}
if (System.getProperty("whichCluster") == null) {
- System.setProperty("whichCluster", "realCluster");
+ System.setProperty("whichCluster", "miniCluster");
System.out.println("should be called");
whichCluster = System.getProperty("whichCluster");
} else {
Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestTypedApi2.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestTypedApi2.java?rev=909667&r1=909666&r2=909667&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestTypedApi2.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestTypedApi2.java Sat Feb 13 00:06:15 2010
@@ -101,7 +101,7 @@
static String inputPath;
static String inputFileName = "multi-input.txt";
- protected static ExecType execType = ExecType.MAPREDUCE;
+ protected static ExecType execType = ExecType.LOCAL;
private static MiniCluster cluster;
protected static PigServer pigServer;
// private static Path pathWorking, pathTable1, path2, path3,
@@ -127,7 +127,7 @@
}
if (System.getProperty("whichCluster") == null) {
- System.setProperty("whichCluster", "realCluster");
+ System.setProperty("whichCluster", "miniCluster");
System.out.println("should be called");
whichCluster = System.getProperty("whichCluster");
} else {
Added: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/ArticleGenerator.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/ArticleGenerator.java?rev=909667&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/ArticleGenerator.java (added)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/ArticleGenerator.java Sat Feb 13 00:06:15 2010
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.zebra.mapreduce;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.zebra.tfile.RandomDistribution.DiscreteRNG;
+import org.apache.hadoop.zebra.tfile.RandomDistribution.Flat;
+
+/**
+ * Generate some input text files.
+ */
+class ArticleGenerator {
+ Random random;
+ Dictionary dict;
+ int pageWidth;
+ DiscreteRNG lastLineLenGen;
+ DiscreteRNG paragraphLineLenGen;
+ DiscreteRNG paragraphLenGen;
+ long wordCount;
+ long lineCount;
+
+ /**
+ * Create an article generator.
+ *
+ * @param dictWordCnt
+ * Number of words in the dictionary.
+ * @param minWordLen
+ * Minimum word length
+ * @param maxWordLen
+ * Maximum word length
+ * @param lineWidth
+ * Line width.
+ */
+ ArticleGenerator(int dictWordCnt, int minWordLen, int maxWordLen,
+ int pageWidth) {
+ random = new Random(System.nanoTime());
+ dict = new Dictionary(random, dictWordCnt, minWordLen, maxWordLen, 100);
+ this.pageWidth = pageWidth;
+ lastLineLenGen = new Flat(random, 1, pageWidth);
+ paragraphLineLenGen = new Flat(random, pageWidth * 3 / 4, pageWidth);
+ paragraphLenGen = new Flat(random, 1, 40);
+ }
+
+ /**
+ * Create an article
+ *
+ * @param fs
+ * File system.
+ * @param path
+ * path of the file
+ * @param length
+ * Expected size of the file.
+ * @throws IOException
+ */
+ void createArticle(FileSystem fs, Path path, long length) throws IOException {
+ FSDataOutputStream fsdos = fs.create(path, false);
+ StringBuilder sb = new StringBuilder();
+ int remainLinesInParagraph = paragraphLenGen.nextInt();
+ while (fsdos.getPos() < length) {
+ if (remainLinesInParagraph == 0) {
+ remainLinesInParagraph = paragraphLenGen.nextInt();
+ fsdos.write('\n');
+ }
+ int lineLen = paragraphLineLenGen.nextInt();
+ if (--remainLinesInParagraph == 0) {
+ lineLen = lastLineLenGen.nextInt();
+ }
+ sb.setLength(0);
+ while (sb.length() < lineLen) {
+ if (sb.length() > 0) {
+ sb.append(' ');
+ }
+ sb.append(dict.nextWord());
+ ++wordCount;
+ }
+ sb.append('\n');
+ fsdos.write(sb.toString().getBytes());
+ ++lineCount;
+ }
+ fsdos.close();
+ }
+
+ /**
+ * Create a bunch of files under the same directory.
+ *
+ * @param fs
+ * File system
+ * @param parent
+ * directory where files should be created
+ * @param prefix
+ * prefix name of the files
+ * @param n
+ * total number of files
+ * @param length
+ * length of each file.
+ * @throws IOException
+ */
+ void batchArticalCreation(FileSystem fs, Path parent, String prefix, int n,
+ long length) throws IOException {
+ for (int i = 0; i < n; ++i) {
+ createArticle(fs, new Path(parent, String.format("%s%06d", prefix, i)),
+ length);
+ }
+ }
+
+ static class Summary {
+ long wordCount;
+ long lineCount;
+ Map<String, Long> wordCntDist;
+
+ Summary() {
+ wordCntDist = new HashMap<String, Long>();
+ }
+ }
+
+ void resetSummary() {
+ wordCount = 0;
+ lineCount = 0;
+ dict.resetWordCnts();
+ }
+
+ Summary getSummary() {
+ Summary ret = new Summary();
+ ret.wordCount = wordCount;
+ ret.lineCount = lineCount;
+ ret.wordCntDist = dict.getWordCounts();
+ return ret;
+ }
+}
Added: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/Dictionary.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/Dictionary.java?rev=909667&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/Dictionary.java (added)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/Dictionary.java Sat Feb 13 00:06:15 2010
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.zebra.mapreduce;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+
+import org.apache.hadoop.zebra.tfile.RandomDistribution.Binomial;
+import org.apache.hadoop.zebra.tfile.RandomDistribution.DiscreteRNG;
+import org.apache.hadoop.zebra.tfile.RandomDistribution.Zipf;
+
+/**
+ * A dictionary that generates English words, whose frequency follows Zipf
+ * distributions, and length follows Binomial distribution.
+ */
+class Dictionary {
+ private static final double BINOMIAL_P = 0.3;
+ private static final double SIGMA = 1.1;
+ private final int lead;
+ private final Zipf zipf;
+ private final String[] dict;
+ private final long[] wordCnts;
+
+ private static String makeWord(DiscreteRNG rng, Random random) {
+ int len = rng.nextInt();
+ StringBuilder sb = new StringBuilder(len);
+ for (int i = 0; i < len; ++i) {
+ sb.append((char) ('a' + random.nextInt(26)));
+ }
+ return sb.toString();
+ }
+
+ /**
+ * Constructor
+ *
+ * @param entries
+ * How many words exist in the dictionary.
+ * @param minWordLen
+ * Minimum word length.
+ * @param maxWordLen
+ * Maximum word length.
+ * @param freqRatio
+ * Expected ratio between the most frequent words and the least
+ * frequent words. (e.g. 100)
+ */
+ public Dictionary(Random random, int entries, int minWordLen, int maxWordLen,
+ int freqRatio) {
+ Binomial binomial = new Binomial(random, minWordLen, maxWordLen, BINOMIAL_P);
+ lead = Math.max(0,
+ (int) (entries / (Math.exp(Math.log(freqRatio) / SIGMA) - 1)) - 1);
+ zipf = new Zipf(random, lead, entries + lead, 1.1);
+ dict = new String[entries];
+ // Use a set to ensure no dup words in dictionary
+ Set<String> dictTmp = new HashSet<String>();
+ for (int i = 0; i < entries; ++i) {
+ while (true) {
+ String word = makeWord(binomial, random);
+ if (!dictTmp.contains(word)) {
+ dictTmp.add(word);
+ dict[i] = word;
+ break;
+ }
+ }
+ }
+ wordCnts = new long[dict.length];
+ }
+
+ /**
+ * Get the next word from the dictionary.
+ *
+ * @return The next word from the dictionary.
+ */
+ public String nextWord() {
+ int index = zipf.nextInt() - lead;
+ ++wordCnts[index];
+ return dict[index];
+ }
+
+ public void resetWordCnts() {
+ for (int i = 0; i < wordCnts.length; ++i) {
+ wordCnts[i] = 0;
+ }
+ }
+
+ public Map<String, Long> getWordCounts() {
+ Map<String, Long> ret = new HashMap<String, Long>();
+ for (int i = 0; i < dict.length; ++i) {
+ if (wordCnts[i] > 0) {
+ ret.put(dict[i], wordCnts[i]);
+ }
+ }
+ return ret;
+ }
+}
Added: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/TableMRSample.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/TableMRSample.java?rev=909667&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/TableMRSample.java (added)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/TableMRSample.java Sat Feb 13 00:06:15 2010
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.zebra.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.zebra.mapreduce.BasicTableOutputFormat;
+import org.apache.hadoop.zebra.parser.ParseException;
+import org.apache.hadoop.zebra.schema.Schema;
+import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.pig.data.Tuple;
+
+/**
+ * This is a sample a complete MR sample code for Table. It doens't contain
+ * 'read' part. But, it should be similar and easier to write. Refer to test
+ * cases in the same directory.
+ *
+ * Assume the input files contain rows of word and count, separated by a space:
+ *
+ * <pre>
+ * this 2
+ * is 1
+ * a 4
+ * test 2
+ * hello 1
+ * world 3
+ * </pre>
+ *
+ */
+public class TableMRSample {
+ static class MapClass extends
+ Mapper<LongWritable, Text, BytesWritable, Tuple> {
+ private BytesWritable bytesKey;
+ private Tuple tupleRow;
+
+ @Override
+ public void map(LongWritable key, Text value, Context context)
+ throws IOException, InterruptedException {
+
+ // value should contain "word count"
+ String[] wdct = value.toString().split(" ");
+ if (wdct.length != 2) {
+ // LOG the error
+ return;
+ }
+
+ byte[] word = wdct[0].getBytes();
+ bytesKey.set(word, 0, word.length);
+ tupleRow.set(0, new String(word));
+ tupleRow.set(1, Integer.parseInt(wdct[1]));
+
+ context.write(bytesKey, tupleRow);
+ }
+
+ @Override
+ public void setup(Context context) {
+ bytesKey = new BytesWritable();
+ try {
+ Schema outSchema = BasicTableOutputFormat.getSchema(context);
+ tupleRow = TypesUtils.createTuple(outSchema);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } catch (ParseException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ }
+
+ public static void main(String[] args) throws ParseException, IOException,
+ InterruptedException, ClassNotFoundException {
+ Job job = new Job();
+ job.setJobName("tableMRSample");
+ Configuration conf = job.getConfiguration();
+ conf.set("table.output.tfile.compression", "gz");
+
+ // input settings
+ job.setInputFormatClass(TextInputFormat.class);
+ job.setMapperClass(TableMRSample.MapClass.class);
+ FileInputFormat.setInputPaths(job, new Path(
+ "/user/joe/inputdata/input.txt"));
+
+ // output settings
+ Path outPath = new Path("/user/joe/outputdata/");
+ job.setOutputFormatClass(BasicTableOutputFormat.class);
+ BasicTableOutputFormat.setOutputPath(job, outPath);
+ // set the logical schema with 2 columns
+ BasicTableOutputFormat.setSchema(job, "word:string, count:int");
+ // for demo purposes, create 2 physical column groups
+ BasicTableOutputFormat.setStorageHint(job, "[word];[count]");
+
+ // set map-only job.
+ job.setNumReduceTasks(0);
+ job.submit();
+ }
+}