You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ya...@apache.org on 2010/02/13 01:06:17 UTC

svn commit: r909667 [3/9] - in /hadoop/pig/branches/load-store-redesign/contrib/zebra: ./ src/java/org/apache/hadoop/zebra/ src/java/org/apache/hadoop/zebra/io/ src/java/org/apache/hadoop/zebra/mapred/ src/java/org/apache/hadoop/zebra/mapreduce/ src/ja...

Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java?rev=909667&r1=909666&r2=909667&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java Sat Feb 13 00:06:15 2010
@@ -19,15 +19,11 @@
 package org.apache.hadoop.zebra.pig;
 
 import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.lang.reflect.Constructor;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
-import java.util.TreeMap;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -36,558 +32,385 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.zebra.io.BasicTable;
-import org.apache.hadoop.zebra.mapred.TableInputFormat;
-import org.apache.hadoop.zebra.mapred.TableRecordReader;
+import org.apache.hadoop.zebra.mapreduce.TableInputFormat;
+import org.apache.hadoop.zebra.mapreduce.TableRecordReader;
 import org.apache.hadoop.zebra.parser.ParseException;
 import org.apache.hadoop.zebra.schema.ColumnType;
+import org.apache.hadoop.zebra.schema.Schema;
 import org.apache.hadoop.zebra.schema.Schema.ColumnSchema;
 import org.apache.hadoop.zebra.types.Projection;
-import org.apache.hadoop.zebra.types.TypesUtils;
 import org.apache.hadoop.zebra.types.SortInfo;
-import org.apache.pig.ExecType;
-import org.apache.pig.LoadFunc;
-import org.apache.pig.Slice;
-import org.apache.pig.Slicer;
-import org.apache.pig.backend.datastorage.DataStorage;
-import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
-import org.apache.pig.data.DataBag;
+import org.apache.pig.Expression;
+import org.apache.pig.LoadMetadata;
+import org.apache.pig.LoadPushDown;
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.ResourceStatistics;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
+import org.apache.pig.data.DefaultTupleFactory;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.logicalLayer.optimizer.PruneColumns;
-import org.apache.pig.impl.io.BufferedPositionedInputStream;
 import org.apache.pig.impl.logicalLayer.FrontendException;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.util.UDFContext;
 import org.apache.hadoop.zebra.pig.comparator.*;
 import org.apache.pig.IndexableLoadFunc;
-import org.apache.hadoop.zebra.io.TableScanner;
 
 /**
  * Pig IndexableLoadFunc and Slicer for Zebra Table
  */
-public class TableLoader implements IndexableLoadFunc, Slicer {
-	static final Log LOG = LogFactory.getLog(TableLoader.class);
-	private TableInputFormat inputFormat;
-	private JobConf jobConf;
-	private String projectionString;
-	private Path[] paths;
-	private TableRecordReader indexReader = null;
-	private BytesWritable indexKey = null;
-    private Tuple tuple;
-    private org.apache.hadoop.zebra.schema.Schema schema;  
+public class TableLoader extends IndexableLoadFunc implements LoadMetadata, LoadPushDown {
+    static final Log LOG = LogFactory.getLog(TableLoader.class);
+
+    private static final String UDFCONTEXT_PROJ_STRING = "zebra.UDFContext.projectionString";
+
+    private String projectionString;
+
+    private TableRecordReader tableRecordReader = null;
+
+    private Schema schema;  
     private SortInfo sortInfo;
     private boolean sorted = false;
-    private org.apache.hadoop.zebra.schema.Schema projectionSchema;
-	/**
-	 * default constructor
-	 */
-	public TableLoader() {
-		inputFormat = new TableInputFormat();
-	}
-
-	/**
-	 * @param projectionStr
-	 * 		  projection string passed from pig query.
-	 */
-	public TableLoader(String projectionStr) {
-		inputFormat = new TableInputFormat();
-		projectionString = projectionStr;	  
-	}
-
-  /**
-	 * @param projectionStr
-	 * 		  projection string passed from pig query.
-   * @param sorted
-   *      need sorted table(s)?
-	 */
-	public TableLoader(String projectionStr, String sorted) throws IOException {
-      inputFormat = new TableInputFormat();
-      if (projectionStr != null && !projectionStr.isEmpty())
-        projectionString = projectionStr;	  
-      if (sorted.equalsIgnoreCase("sorted"))
-        this.sorted = true;
-      else
-        throw new IOException("Invalid argument to the table loader constructor: "+sorted);
-	}
-
-	@Override
-	public void initialize(Configuration conf) throws IOException
-	{
-	  if (conf == null)
-	    throw new IOException("Null Configuration passed.");
-	  jobConf = new JobConf(conf);
-	}
-	
-	@Override
-	public void bindTo(String filePaths, BufferedPositionedInputStream is,
-			long offset, long end) throws IOException {
-
-      FileInputFormat.setInputPaths(jobConf, filePaths);
-      Path[] paths = FileInputFormat.getInputPaths(jobConf);
-			/**
-			 * Performing glob pattern matching
-			 */
-			List<Path> result = new ArrayList<Path>(paths.length);
-			for (Path p : paths) {
-				FileSystem fs = p.getFileSystem(jobConf);
-				FileStatus[] matches = fs.globStatus(p);
-				if (matches == null) {
-					LOG.warn("Input path does not exist: " + p);
-				}
-				else if (matches.length == 0) {
-					LOG.warn("Input Pattern " + p + " matches 0 files");
-				} else {
-					for (FileStatus globStat: matches) {
-						if (globStat.isDir()) {
-							result.add(globStat.getPath());
-						} else {
-							LOG.warn("Input path " + p + " is not a directory");
-						}
-					}
-				}
-			}
-			if (result.isEmpty()) {
-				throw new IOException("No table specified for input");
-			}
-			TableInputFormat.setInputPaths(jobConf, result.toArray(new Path[result.size()]));
-
-      TableInputFormat.requireSortedTable(jobConf, null);
-      sortInfo = TableInputFormat.getSortInfo(jobConf);
-      schema = TableInputFormat.getSchema(jobConf);
-      int numcols = schema.getNumColumns();
-      tuple = TypesUtils.createTuple(numcols);      
-      setProjection();
-      /*
-       * Use all columns of a table as a projection: not an optimal approach
-       * No need to call TableInputFormat.setProjection: by default use all columns
-       */
-      try {
-        indexReader = TableInputFormat.getTableRecordReader(jobConf, null);
-      } catch (ParseException e) {
-    	  throw new IOException("Exception from TableInputFormat.getTableRecordReader: "+e.getMessage());
-      }
-      indexKey = new BytesWritable();
+    private Schema projectionSchema;
+    private String udfContextSignature = null;
+    
+    private Configuration conf = null;
+
+    private KeyGenerator keyGenerator = null;
+
+    /**
+     * default constructor
+     */
+    public TableLoader() {
     }
 
-  @Override
-  public void seekNear(Tuple t) throws IOException
-  {
-		// SortInfo sortInfo =  inputFormat.getSortInfo(conf, path);
-		String[] sortColNames = sortInfo.getSortColumnNames();
-		byte[] types = new byte[sortColNames.length];
-		for(int i =0 ; i < sortColNames.length; ++i){
-			types[i] = schema.getColumn(sortColNames[i]).getType().pigDataType();
-		}
-		KeyGenerator builder = makeKeyBuilder(types);
-		BytesWritable key = builder.generateKey(t);
-//	    BytesWritable key = new BytesWritable(((String) t.get(0)).getBytes());
-		indexReader.seekTo(key);
-  }
-
-  private KeyGenerator makeKeyBuilder(byte[] elems) {
-	    ComparatorExpr[] exprs = new ComparatorExpr[elems.length];
-	    for (int i = 0; i < elems.length; ++i) {
-	      exprs[i] = ExprUtils.primitiveComparator(i, elems[i]);
-	    }
-	    return new KeyGenerator(ExprUtils.tupleComparator(exprs));
-  }  
-	/**
-	 * @param storage
-	 * @param location
-	 *        The location format follows the same convention as
-	 *        FileInputFormat's comma-separated multiple path format.
-	 * @throws IOException
-	*/
-	private void checkConf(DataStorage storage, String location) throws IOException {
-		if (jobConf == null) {
-			Configuration conf =
-				ConfigurationUtil.toConfiguration(storage.getConfiguration());
-			jobConf = new JobConf(conf);
-			jobConf.setInputFormat(TableInputFormat.class);			
-			
-			// TODO: the following code may better be moved to TableInputFormat.
-			// Hack: use FileInputFormat to decode comma-separated multiple path
-			// format.
-			
-			FileInputFormat.setInputPaths(jobConf, location);
-			paths = FileInputFormat.getInputPaths(jobConf);
-			
-			/**
-			 * Performing glob pattern matching
-			 */
-			List<Path> result = new ArrayList<Path>(paths.length);
-			for (Path p : paths) {
-				FileSystem fs = p.getFileSystem(jobConf);
-				FileStatus[] matches = fs.globStatus(p);
-				if (matches == null) {
-					throw new IOException("Input path does not exist: " + p);
-				}
-				else if (matches.length == 0) {
-					LOG.warn("Input Pattern " + p + " matches 0 files");
-				} else {
-					for (FileStatus globStat: matches) {
-						if (globStat.isDir()) {
-							result.add(globStat.getPath());
-						} else {
-							LOG.warn("Input path " + p + " is not a directory");
-						}
-					}
-				}
-			}
-			if (result.isEmpty()) {
-				throw new IOException("No table specified for input");
-			}
-			
-			LOG.info("Total input tables to process : " + result.size()); 
-			TableInputFormat.setInputPaths(jobConf, result.toArray(new Path[result.size()]));
-			if (sorted)
-				TableInputFormat.requireSortedTable(jobConf, null);
-		}
-	}
-
-	
-	private void setProjection() throws IOException {
-		try {
-			
-			String pigLoadSignature = jobConf.get("pig.loader.signature");
-			Properties p = UDFContext.getUDFContext().getUDFProperties(this.getClass());
-			String prunedProjStr = null;
-			if( pigLoadSignature != null)
-				prunedProjStr = p.getProperty(pigLoadSignature);
-			
-			if(prunedProjStr != null ) {
-				TableInputFormat.setProjection(jobConf, prunedProjStr);
-			} else {
-				if (projectionString != null) {    		  
-					TableInputFormat.setProjection(jobConf, projectionString);
-				}
-			}
-		} catch (ParseException e) {
-			throw new IOException("Schema parsing failed : "+e.getMessage());
-		}
-
-		
-		
-	}
-	
-	@Override
-	public Schema determineSchema(String fileName, ExecType execType,
-			DataStorage storage) throws IOException {
-		
-		checkConf(storage, fileName);
-		
-		// This is bad but its done for pig. Pig creates one loadfunc object and uses to different
-		// signatures. Zebra does not modify jobConf object once created. However, we might have the new
-		// signature in this function everytime. 
-		String pigLoadSignature = storage.getConfiguration().getProperty("pig.loader.signature");
-		if( pigLoadSignature != null) {
-			jobConf.set("pig.loader.signature", pigLoadSignature);
-		}	
-		setProjection();
-		
-		Projection projection;
-
-    org.apache.hadoop.zebra.schema.Schema tschema = TableInputFormat.getSchema(jobConf);
-    try {
-      projection = new org.apache.hadoop.zebra.types.Projection(tschema, TableInputFormat.getProjection(jobConf));
-      projectionSchema = projection.getProjectionSchema();
-    } catch (ParseException e) {
-      throw new IOException("Schema parsing failed : "+e.getMessage());
+    /**
+     * @param projectionStr
+     *           projection string passed from pig query.
+     */
+    public TableLoader(String projectionStr) {
+        if( projectionStr != null && !projectionStr.isEmpty() )
+            projectionString = projectionStr;
     }
 
-		if (projectionSchema == null) {
-			throw new IOException("Cannot determine table projection schema");
-		}
-    
-		try {
-			return SchemaConverter.toPigSchema(projectionSchema);
-		} catch (FrontendException e) {
-			throw new IOException("FrontendException", e);
-		}
-	}
-
-	@Override
-    public RequiredFieldResponse fieldsToRead(RequiredFieldList requiredFieldList) throws FrontendException {
-
-		
-		String pigLoadSignature = requiredFieldList.getSignature();
-		if(pigLoadSignature == null) {
-			throw new FrontendException("Zebra Cannot have null loader signature in fieldsToRead");
-		}	
-		
-		List<RequiredField> rFields = requiredFieldList.getFields();
-		if( rFields == null) {
-			throw new FrontendException("requiredFieldList.getFields() can not return null in fieldsToRead");
-		}	
-
-		Iterator<RequiredField> it= rFields.iterator();
-		String projectionStr = "";
-		
-		while( it.hasNext()) {
-			RequiredField rField = (RequiredField) it.next();
-			ColumnSchema cs = projectionSchema.getColumn(rField.getIndex());
-			
-			if(cs == null) {
-				throw new FrontendException
-				("Null column schema in projection schema in fieldsToRead at index " + rField.getIndex()); 
-			}
-			
-		    if(cs.getType() != ColumnType.MAP && (rField.getSubFields() != null)) {    	
-		    	throw new FrontendException
-		    	("Zebra cannot have subfields for a non-map column type in fieldsToRead " + 
-		    	 "ColumnType:" + cs.getType() + " index in zebra projection schema: " + rField.getIndex()		
-		    	);
-		    }
-		    String name = cs.getName();
-	    	projectionStr = projectionStr + name ;
-		    if(cs.getType() == ColumnType.MAP) {    	
-		    	List<RequiredField> subFields = rField.getSubFields();
-		    	
-		    	if( subFields != null ) {
-		    	
-    		    	Iterator<RequiredField> its= subFields.iterator();
-	    	    	boolean flag = false;
-		        	if(its.hasNext()) {
-		        		flag = true;
-		    	    	projectionStr += "#" + "{";
-		        	}	
-		        	String tmp = "";
-		        	while(its.hasNext()) {
-		        		RequiredField sField = (RequiredField) its.next();	
-		        		tmp = tmp + sField.getAlias();
-		        		if(its.hasNext()) {
-		        			tmp = tmp + "|";
-		        		}
-		        	}  
-		        	if ( flag) {
-		        		projectionStr = projectionStr + tmp + "}";
-		        	}
-		    	}	
-		    }
-	    	if(it.hasNext()) {
-	    		projectionStr = projectionStr + " , ";
-	    	}
-		}
-		Properties p = UDFContext.getUDFContext().getUDFProperties(this.getClass());
-		
-		if(p == null) {
-			throw new FrontendException("Zebra Cannot have null UDFCOntext property");
-		}	
-		
-		if(projectionStr != null && (projectionStr != ""))
-			p.setProperty(pigLoadSignature, projectionStr);
-				
-		RequiredFieldResponse rfr = new RequiredFieldResponse(true);
-		
-		return rfr;		
-	}
-
-	@Override
-	public Tuple getNext() throws IOException {
-      if (indexReader.atEnd())
-        return null;
-      indexReader.next(indexKey, tuple);
-      return tuple;
-	}
-
-  @Override
-  public void close() throws IOException {
-    if (indexReader != null)
-      indexReader.close();
-  }
-
-	@Override
-	public Slice[] slice(DataStorage store, String location) throws IOException {
-		
-		checkConf(store, location);
-		setProjection();
-		// TableInputFormat accepts numSplits < 0 (special case for no-hint)
-		InputSplit[] splits = inputFormat.getSplits(jobConf, -1);
-
-		Slice[] slices = new Slice[splits.length];
-		for (int nx = 0; nx < slices.length; nx++) {
-			slices[nx] = new TableSlice(jobConf, splits[nx], sorted);
-		}
-
-		return slices;
-	}
-
-	@Override
-	public void validate(DataStorage store, String location) throws IOException {
-		checkConf(store, location);
-	}
-  
-	static class TableSlice implements Slice {
-		private static final long serialVersionUID = 1L;
-		private static final Class[] emptyArray = new Class[] {};
-    
-		private TreeMap<String, String> configMap;
-		private InputSplit split;
-    
-		transient private JobConf conf;
-		transient private int numProjCols = 0;
-		transient private RecordReader<BytesWritable, Tuple> scanner;
-		transient private BytesWritable key;
-    transient private boolean sorted = false;
-
-		TableSlice(JobConf conf, InputSplit split, boolean sorted) {
-			// hack: expecting JobConf contains nothing but a <string, string>
-			// key-value pair store.
-			configMap = new TreeMap<String, String>();
-			for (Iterator<Map.Entry<String, String>> it = conf.iterator(); it.hasNext();) {
-				Map.Entry<String, String> e = it.next();
-				configMap.put(e.getKey(), e.getValue());
-			}
-			
-			
-			
-			this.split = split;
-			this.sorted = sorted;
-		}
-
-		@Override
-		public void close() throws IOException {
-			if (scanner == null) {
-				throw new IOException("Slice not initialized");
-			}
-			scanner.close();
-		}
-
-		@Override
-		public long getLength() {
-			try {
-				return split.getLength();
-			} catch (IOException e) {
-				throw new RuntimeException("IOException", e);
-			}
-		}
-
-		@Override
-		public String[] getLocations() {
-			try {
-				return split.getLocations();
-			} catch (IOException e) {
-				throw new RuntimeException("IOException", e);
-			}
-		}
-
-		@Override
-		public long getPos() throws IOException {
-			if (scanner == null) {
-				throw new IOException("Slice not initialized");
-			}
-			return scanner.getPos();
-		}
-
-		@Override
-		public float getProgress() throws IOException {
-			if (scanner == null) {
-				throw new IOException("Slice not initialized");
-			}
-			return scanner.getProgress();
-		}
-
-		@Override
-		public long getStart() {
-			return 0;
-		}
-
-		@Override
-		public void init(DataStorage store) throws IOException {
-			Configuration localConf = new Configuration();
-			for (Iterator<Map.Entry<String, String>> it =
-				configMap.entrySet().iterator(); it.hasNext();) {
-				Map.Entry<String, String> e = it.next();
-				localConf.set(e.getKey(), e.getValue());
-			}
-			conf = new JobConf(localConf);
-			String projection;			
-			try
-			{
-				projection = TableInputFormat.getProjection(conf);
-			} catch (ParseException e) {
-				throw new IOException("Schema parsing failed :"+e.getMessage());
-			}
-			numProjCols = Projection.getNumColumns(projection);
-			TableInputFormat inputFormat = new TableInputFormat();
-			if (sorted)
-				TableInputFormat.requireSortedTable(conf, null);
-			scanner = inputFormat.getRecordReader(split, conf, Reporter.NULL);
-			key = new BytesWritable();
-		}
-
-		@Override
-		public boolean next(Tuple value) throws IOException {
-			
-			TypesUtils.formatTuple(value, numProjCols);
-			return scanner.next(key, value);
-		}
-    
-		private void writeObject(ObjectOutputStream out) throws IOException {
-			out.writeObject(configMap);
-			out.writeObject(split.getClass().getName());
-			split.write(out);
-		} 
+    /**
+     * @param projectionStr
+     *           projection string passed from pig query.
+     * @param sorted
+     *      need sorted table(s)?
+     */
+    public TableLoader(String projectionStr, String sorted) throws IOException {
+        this( projectionStr );
+        
+        if( sorted.equalsIgnoreCase( "sorted" ) )
+            this.sorted = true;
+        else
+            throw new IOException( "Invalid argument to the table loader constructor: " + sorted );
+    }
+
+    @Override
+    public void initialize(Configuration conf) throws IOException {
+        // Here we do ugly workaround for the problem in pig. the passed in parameter conf contains 
+        // value for TableInputFormat.INPUT_PROJ that was set by left table execution in a merge join
+        // case. Here, we try to get rid of the side effect and copy everything expect that entry.
+        this.conf = new Configuration( false );
+        Iterator<Map.Entry<String, String>> it = conf.iterator();
+        while( it.hasNext() ) {
+            Map.Entry<String, String> entry = it.next();
+            String key = entry.getKey();
+            if( key.equals( "mapreduce.lib.table.input.projection" ) ) // The string const is defined in TableInputFormat.
+                continue;
+            this.conf.set( entry.getKey(), entry.getValue() );
+        }
+
+        tableRecordReader = createIndexReader();
+
+        String[] sortColNames = sortInfo.getSortColumnNames();
+        byte[] types = new byte[sortColNames.length];
+        for(int i =0 ; i < sortColNames.length; ++i){
+            types[i] = schema.getColumn(sortColNames[i]).getType().pigDataType();
+        }
+        keyGenerator = makeKeyBuilder( types );
+    }
+
+    /**
+     * This method is called only once.
+     */
+    @Override
+    public void seekNear(Tuple tuple) throws IOException {
+        BytesWritable key = keyGenerator.generateKey( tuple );
+        tableRecordReader.seekTo( key );
+    }
     
-		@SuppressWarnings("unchecked")
-		private void readObject(ObjectInputStream in) throws IOException,
-        	ClassNotFoundException {
-			configMap = (TreeMap<String, String>) in.readObject();
-			String className = (String) in.readObject();
-			Class<InputSplit> clazz = (Class<InputSplit>) Class.forName(className);
-			try {
-				Constructor<InputSplit> meth = clazz.getDeclaredConstructor(emptyArray);
-				meth.setAccessible(true);
-				split = meth.newInstance();
-			} catch (Exception e) {
-				throw new ClassNotFoundException("Cannot create instance", e);
-			}
-			split.readFields(in);
-		}
-	}
-
-	@Override
-	public DataBag bytesToBag(byte[] b) throws IOException {
-		throw new IOException("Not implemented");
-	}
-
-	@Override
-	public String bytesToCharArray(byte[] b) throws IOException {
-		throw new IOException("Not implemented");
-	}
-
-	@Override
-	public Double bytesToDouble(byte[] b) throws IOException {
-		throw new IOException("Not implemented");
-	}
-
-	@Override
-	public Float bytesToFloat(byte[] b) throws IOException {
-		throw new IOException("Not implemented");
-	}
-
-	@Override
-	public Integer bytesToInteger(byte[] b) throws IOException {
-		throw new IOException("Not implemented");
-	}
-
-	@Override
-	public Long bytesToLong(byte[] b) throws IOException {
-		throw new IOException("Not implemented");
-	}
-
-	public Map<String, Object> bytesToMap(byte[] b) throws IOException {
-		throw new IOException("Not implemented");
-	}
-
-	@Override
-	public Tuple bytesToTuple(byte[] b) throws IOException {
-		throw new IOException("Not implemented");
-	}
+    private TableRecordReader createIndexReader() throws IOException {
+        Job job = new Job( conf );
+
+        // Obtain the schema and sort info. for index reader, the table must be sorted.
+        schema = TableInputFormat.getSchema( job );
+        sorted = true;
+        
+        setProjection( job );
+
+        try {
+            return TableInputFormat.createTableRecordReader( job, TableInputFormat.getProjection( job ) );
+        } catch(ParseException ex) {
+            throw new IOException( "Exception from TableInputFormat.getTableRecordReader: "+ ex.getMessage() );
+        } catch(InterruptedException ex){
+            throw new IOException( "Exception from TableInputFormat.getTableRecordReader: " + ex.getMessage() );
+        }
+    }
+
+    /**
+     * This method does more than set projection. For instance, it also try to grab sorting info if required.
+     * 
+     * @param job
+     * @throws IOException
+     */
+    private void setProjection(Job job) throws IOException {
+        if( sorted ) {
+            TableInputFormat.requireSortedTable( job, null );
+            sortInfo = TableInputFormat.getSortInfo( job );
+        }
+        
+        try {
+            Properties properties = UDFContext.getUDFContext().getUDFProperties( 
+                    this.getClass(), new String[]{ udfContextSignature } );
+            String prunedProjStr = properties.getProperty( UDFCONTEXT_PROJ_STRING );
+            
+            if( prunedProjStr != null ) {
+                TableInputFormat.setProjection( job, prunedProjStr );
+            } else if( projectionString != null ) {              
+                TableInputFormat.setProjection( job, projectionString );
+            }
+        } catch (ParseException ex) {
+            throw new IOException( "Schema parsing failed : " + ex.getMessage() );
+        }
+    }
+
+    private KeyGenerator makeKeyBuilder(byte[] elems) {
+        ComparatorExpr[] exprs = new ComparatorExpr[elems.length];
+        for (int i = 0; i < elems.length; ++i) {
+            exprs[i] = ExprUtils.primitiveComparator(i, elems[i]);
+        }
+        return new KeyGenerator(ExprUtils.tupleComparator(exprs));
+    }  
+
+    /*
+     * Hack: use FileInputFormat to decode comma-separated multiple path format.
+     */ 
+     private static Path[] getPathsFromLocation(String location, Configuration conf) throws IOException {
+         Job j = new Job( conf );
+         FileInputFormat.setInputPaths( j, location );
+         Path[] paths = FileInputFormat.getInputPaths( j );
+
+         /**
+          * Performing glob pattern matching
+          */
+         List<Path> result = new ArrayList<Path>(paths.length);
+         for (Path p : paths) {
+             FileSystem fs = p.getFileSystem(conf);
+             FileStatus[] matches = fs.globStatus(p);
+             if( matches == null ) {
+                 throw new IOException("Input path does not exist: " + p);
+             } else if (matches.length == 0) {
+                 LOG.warn("Input Pattern " + p + " matches 0 files");
+             } else {
+                 for (FileStatus globStat: matches) {
+                     if (globStat.isDir()) {
+                         result.add(globStat.getPath());
+                     } else {
+                         LOG.warn("Input path " + p + " is not a directory");
+                     }
+                 }
+             }
+         }
+         
+         if (result.isEmpty()) {
+             throw new IOException("No table specified for input");
+         }
+
+         LOG.info("Total input tables to process : " + result.size()); 
+
+         return result.toArray( new Path[result.size()] );
+     }
+
+     @Override
+     public Tuple getNext() throws IOException {
+         if (tableRecordReader.atEnd())
+             return null;
+         try {
+             tableRecordReader.nextKeyValue();
+             ArrayList<Object> fields = new ArrayList<Object>(tableRecordReader.getCurrentValue().getAll());
+             return DefaultTupleFactory.getInstance().newTuple(fields);
+         } catch (InterruptedException ex) {
+             throw new IOException( "InterruptedException:" + ex );
+         }
+     }
+
+     @Override
+     public void close() throws IOException {
+         if (tableRecordReader != null)
+             tableRecordReader.close();
+     }
+
+     @SuppressWarnings("unchecked")
+     @Override
+     public void prepareToRead(org.apache.hadoop.mapreduce.RecordReader reader, PigSplit split)
+     throws IOException {
+         tableRecordReader = (TableRecordReader)reader;
+         if( tableRecordReader == null )
+             throw new IOException( "Invalid object type passed to TableLoader" );
+     }
+
+     @Override
+     public void setLocation(String location, Job job) throws IOException {
+         Path[] paths = getPathsFromLocation( location, job.getConfiguration() );
+         TableInputFormat.setInputPaths( job, paths );
+
+         // The following obviously goes beyond of set location, but this is the only place that we
+         // can do and it's suggested by Pig team.
+         setProjection( job );
+     }
+
+     @SuppressWarnings("unchecked")
+     @Override
+     public InputFormat getInputFormat() throws IOException {
+         return new TableInputFormat();
+     }
+
+     @Override
+     public String[] getPartitionKeys(String location, Configuration conf)
+     throws IOException {
+         return null;
+     }
+
+     @Override
+     public ResourceSchema getSchema(String location, Configuration conf) throws IOException {
+         Path[] paths = getPathsFromLocation( location, conf );
+         Job job = new Job(conf);
+         TableInputFormat.setInputPaths( job, paths );
+
+         Schema tableSchema = null;
+         if( paths.length == 1 ) {
+             tableSchema = BasicTable.Reader.getSchema( paths[0], job.getConfiguration() );
+         } else {
+             tableSchema = new Schema();
+             for (Path p : paths) {
+                 Schema schema = BasicTable.Reader.getSchema( p, job.getConfiguration() );
+                 try {
+                     tableSchema.unionSchema( schema );
+                 } catch (ParseException e) {
+                     throw new IOException(e.getMessage());
+                 }
+             }
+         }
+         
+         setProjection( job );
+
+         projectionSchema = tableSchema;
+         try {
+             Projection projection = new org.apache.hadoop.zebra.types.Projection( tableSchema, 
+                     TableInputFormat.getProjection( job ) );
+             projectionSchema = projection.getProjectionSchema();
+         } catch (ParseException e) {
+             throw new IOException( "Schema parsing failed : "+ e.getMessage() );
+         }
+
+         if( projectionSchema == null ) {
+             throw new IOException( "Cannot determine table projection schema" );
+         }
+
+         return SchemaConverter.convertToResourceSchema( projectionSchema );
+     }
+
+     @Override
+     public ResourceStatistics getStatistics(String location, Configuration conf)
+     throws IOException {
+         // Statistics is not supported.
+         return null;
+     }
+
+     @Override
+     public void setPartitionFilter(Expression partitionFilter)
+     throws IOException {
+         // no-op. It should not be ever called since getPartitionKeys returns null.        
+     }
+
+     @Override
+     public List<OperatorSet> getFeatures() {
+         List<OperatorSet> features = new ArrayList<OperatorSet>(1);
+         features.add( LoadPushDown.OperatorSet.PROJECTION );
+         return features;
+     }
+
+     @Override
+     public RequiredFieldResponse pushProjection(RequiredFieldList requiredFieldList)
+     throws FrontendException {
+            List<RequiredField> rFields = requiredFieldList.getFields();
+            if( rFields == null) {
+                throw new FrontendException("requiredFieldList.getFields() can not return null in fieldsToRead");
+            }    
+
+            String projectionStr = "";
+            
+            Iterator<RequiredField> it= rFields.iterator();
+            while( it.hasNext() ) {
+                RequiredField rField = (RequiredField) it.next();
+                ColumnSchema cs = projectionSchema.getColumn(rField.getIndex());
+                
+                if(cs == null) {
+                    throw new FrontendException
+                    ("Null column schema in projection schema in fieldsToRead at index " + rField.getIndex()); 
+                }
+                
+                if(cs.getType() != ColumnType.MAP && (rField.getSubFields() != null)) {        
+                    throw new FrontendException
+                    ("Zebra cannot have subfields for a non-map column type in fieldsToRead " + 
+                     "ColumnType:" + cs.getType() + " index in zebra projection schema: " + rField.getIndex()
+                    );
+                }
+                String name = cs.getName();
+                projectionStr = projectionStr + name ;
+                if(cs.getType() == ColumnType.MAP) {        
+                    List<RequiredField> subFields = rField.getSubFields();
+                    
+                    if( subFields != null ) {
+                    
+                        Iterator<RequiredField> its= subFields.iterator();
+                        boolean flag = false;
+                        if(its.hasNext()) {
+                            flag = true;
+                            projectionStr += "#" + "{";
+                        }    
+                        String tmp = "";
+                        while(its.hasNext()) {
+                            RequiredField sField = (RequiredField) its.next();    
+                            tmp = tmp + sField.getAlias();
+                            if(its.hasNext()) {
+                                tmp = tmp + "|";
+                            }
+                        }  
+                        if( flag ) {
+                            projectionStr = projectionStr + tmp + "}";
+                        }
+                    }    
+                }
+                if(it.hasNext()) {
+                    projectionStr = projectionStr + " , ";
+                }
+            }
+            
+            Properties properties = UDFContext.getUDFContext().getUDFProperties( 
+                    this.getClass(), new String[]{ udfContextSignature } );
+            if( projectionStr != null && !projectionStr.isEmpty() )
+                properties.setProperty( UDFCONTEXT_PROJ_STRING, projectionStr );
+
+            return new RequiredFieldResponse( true );
+     }
+
+        @Override
+        public void setUDFContextSignature(String signature) {
+            udfContextSignature = signature;
+        }
 }

Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableStorer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableStorer.java?rev=909667&r1=909666&r2=909667&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableStorer.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableStorer.java Sat Feb 13 00:06:15 2010
@@ -19,19 +19,17 @@
 package org.apache.hadoop.zebra.pig;
 
 import java.io.IOException;
-import java.io.OutputStream;
-import java.lang.reflect.Constructor;
-import java.util.List;
+import java.util.Properties;
 
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapred.OutputFormat;
-import org.apache.hadoop.mapred.RecordWriter;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.zebra.pig.comparator.ComparatorExpr;
 import org.apache.hadoop.zebra.pig.comparator.ExprUtils;
 import org.apache.hadoop.zebra.pig.comparator.KeyGenerator;
@@ -40,66 +38,132 @@
 import org.apache.hadoop.zebra.parser.ParseException;
 import org.apache.hadoop.zebra.types.SortInfo;
 import org.apache.hadoop.zebra.types.TypesUtils;
-import org.apache.pig.StoreConfig;
-import org.apache.pig.CommittableStoreFunc;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.ResourceStatistics;
+import org.apache.pig.StoreFunc;
+import org.apache.pig.StoreMetadata;
+import org.apache.pig.ResourceSchema.ResourceFieldSchema;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.util.UDFContext;
 
 /**
- * Pig CommittableStoreFunc for Zebra Table
+ * Pig LoadFunc implementation for Zebra Table
  */
-public class TableStorer implements CommittableStoreFunc {
-	private String storageHintString;
+public class TableStorer implements StoreFunc, StoreMetadata {
+    private static final String UDFCONTEXT_OUTPUT_SCHEMA = "zebra.UDFContext.outputSchema";
+    private static final String UDFCONTEXT_SORT_INFO = "zebra.UDFContext.sortInfo";
+
+    static final String OUTPUT_STORAGEHINT = "mapreduce.lib.table.output.storagehint";
+    static final String OUTPUT_SCHEMA = "mapreduce.lib.table.output.schema";
+    static final String OUTPUT_PATH = "mapreduce.lib.table.output.dir";
+    static final String SORT_INFO = "mapreduce.lib.table.sort.info";
+
+    private String storageHintString = null;
+    private String udfContextSignature = null;
+    private TableRecordWriter tableRecordWriter = null;
 
-	public TableStorer() {	  
-	}
+    public TableStorer() {
+    }
+
+    public TableStorer(String storageHintStr) throws ParseException, IOException {
+        storageHintString = storageHintStr;
+    }
+
+    @Override
+    public void putNext(Tuple tuple) throws IOException {
+        tableRecordWriter.write( null, tuple );
+    }
+
+    @Override
+    public void checkSchema(ResourceSchema schema) throws IOException {
+        // Get schemaStr and sortColumnNames from the given schema. In the process, we
+        // also validate the schema and sorting info.
+        ResourceSchema.ResourceFieldSchema[] fields = schema.getFields();
+        int[] index = schema.getSortKeys();
+        StringBuilder sortColumnNames = new StringBuilder();
+        for( int i = 0; i< index.length; i++ ) {
+            ResourceFieldSchema field = fields[index[i]];
+            String name = field.getName();
+            if( name == null )
+                throw new IOException("Zebra does not support column positional reference yet");
+            if( !org.apache.pig.data.DataType.isAtomic( field.getType() ) )
+                throw new IOException( "Field [" + name + "] is not of simple type as required for a sort column now." );
+            if( i > 0 )
+                sortColumnNames.append( "," );
+            sortColumnNames.append( name );
+        }
+
+        // Convert resource schema to zebra schema
+        org.apache.hadoop.zebra.schema.Schema zebraSchema;
+        try {
+            zebraSchema = SchemaConverter.convertFromResourceSchema( schema );
+        } catch (ParseException ex) {
+            throw new IOException("Exception thrown from SchemaConverter: " + ex.getMessage() );
+        }
+
+        Properties properties = UDFContext.getUDFContext().getUDFProperties( 
+                this.getClass(), new String[]{ udfContextSignature } );
+        properties.setProperty( UDFCONTEXT_OUTPUT_SCHEMA, zebraSchema.toString() );
+        properties.setProperty( UDFCONTEXT_SORT_INFO, sortColumnNames.toString() );
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public org.apache.hadoop.mapreduce.OutputFormat getOutputFormat()
+    throws IOException {
+        return new TableOutputFormat();
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void prepareToWrite(RecordWriter writer)
+    throws IOException {
+        tableRecordWriter = (TableRecordWriter)writer;
+        if( tableRecordWriter == null ) {
+            throw new IOException( "Invalid type of writer. Expected type: TableRecordWriter." );
+        }
+    }
 
-	public TableStorer(String storageHintStr) throws ParseException, IOException {
-		storageHintString = storageHintStr;
-	}
-  
-	@Override
-	public void bindTo(OutputStream os) throws IOException {
-		// no op
-	}
-
-	@Override
-	public void finish() throws IOException {
-		// no op
-	}
-
-	@Override
-	public void putNext(Tuple f) throws IOException {
-		// no op
-	}
-
-	@Override
-	public Class getStorePreparationClass() throws IOException {
-		return TableOutputFormat.class;
-	}
-  
-	public String getStorageHintString() {
-		return storageHintString;  
-	}
-  
-	private static final Class[] emptyArray = new Class[] {};
-
-	static public void main(String[] args) throws SecurityException, NoSuchMethodException {
-		Constructor meth = TableOutputFormat.class.getDeclaredConstructor(emptyArray);
-	}
-
-  @Override
-  public void commit(Configuration conf) throws IOException {
-    try {
-      JobConf job = new JobConf(conf);
-      StoreConfig storeConfig = MapRedUtil.getStoreConfig(job);
-      BasicTable.Writer write = new BasicTable.Writer(new Path(storeConfig.getLocation()), job);
-      write.close();
-    } catch (IOException ee) {
-      throw ee;
+    @Override
+    public String relToAbsPathForStoreLocation(String location, Path curDir)
+    throws IOException {
+        return LoadFunc.getAbsolutePath( location, curDir );
     }
-  }
+
+    @Override
+    public void setStoreLocation(String location, Job job) throws IOException {
+        Configuration conf = job.getConfiguration();
+        conf.set( OUTPUT_STORAGEHINT, storageHintString );
+        conf.set( OUTPUT_PATH, location );
+
+        // Get schema string and sorting info from UDFContext and re-store them to
+        // job config.
+        Properties properties = UDFContext.getUDFContext().getUDFProperties( 
+                this.getClass(), new String[]{ udfContextSignature } );
+        conf.set( OUTPUT_SCHEMA, properties.getProperty( UDFCONTEXT_OUTPUT_SCHEMA ) );
+        conf.set( SORT_INFO, properties.getProperty( UDFCONTEXT_SORT_INFO ) );
+    }
+
+    @Override
+    public void storeSchema(ResourceSchema schema, String location, Configuration conf)
+    throws IOException {
+    	// no-op. We do close at cleanupJob().
+        BasicTable.Writer write = new BasicTable.Writer( new Path( location ), conf );
+        write.close();
+    }
+
+    @Override
+    public void setStoreFuncUDFContextSignature(String signature) {
+        udfContextSignature = signature;
+    }
+
+    @Override
+    public void storeStatistics(ResourceStatistics stats, String location,
+            Configuration conf) throws IOException {
+        // no-op
+    }
+
 }
 
 /**
@@ -107,58 +171,72 @@
  * Table OutputFormat
  * 
  */
-class TableOutputFormat implements OutputFormat<BytesWritable, Tuple> {
-	@Override
-	public void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException {
-		StoreConfig storeConfig = MapRedUtil.getStoreConfig(job);
-		String location = storeConfig.getLocation(), schemaStr;
-    Schema schema = storeConfig.getSchema();
-    org.apache.pig.SortInfo pigSortInfo = storeConfig.getSortInfo();
-
-    /* TODO
-     * use a home-brewn comparator ??
-     */
-    String comparator = null;
-    String sortColumnNames = null;
-    if (pigSortInfo != null)
-    {
-      List<org.apache.pig.SortColInfo> sortColumns = pigSortInfo.getSortColInfoList();
-      StringBuilder sb = new StringBuilder();
-      if (sortColumns != null && sortColumns.size() >0)
-      {
-        org.apache.pig.SortColInfo sortColumn;
-        String sortColumnName;
-        for (int i = 0; i < sortColumns.size(); i++)
-        {
-          sortColumn = sortColumns.get(i);
-          sortColumnName = sortColumn.getColName();
-          if (sortColumnName == null)
-            throw new IOException("Zebra does not support column positional reference yet");
-          if (!org.apache.pig.data.DataType.isAtomic(schema.getField(sortColumnName).type))
-        	  throw new IOException(schema.getField(sortColumnName).alias+" is not of simple type as required for a sort column now.");
-          if (i > 0)
-            sb.append(",");
-          sb.append(sortColumnName);
-        }
-        sortColumnNames = sb.toString();
-      }
-    }
-    try {
-      schemaStr = SchemaConverter.fromPigSchema(schema).toString();
-    } catch (ParseException e) {
-      throw new IOException("Exception thrown from SchemaConverter: " + e.getMessage());
-    }
-		TableStorer storeFunc = (TableStorer)MapRedUtil.getStoreFunc(job);   
-		BasicTable.Writer writer = new BasicTable.Writer(new Path(location), 
-				schemaStr, storeFunc.getStorageHintString(), sortColumnNames, comparator, job);
-		writer.finish();
-	}
-    
-	@Override
-	public RecordWriter<BytesWritable, Tuple> getRecordWriter(FileSystem ignored,
-			JobConf job, String name, Progressable progress) throws IOException {
-		return new TableRecordWriter(name, job);
-	}
+class TableOutputFormat extends OutputFormat<BytesWritable, Tuple> {
+    @Override
+    public void checkOutputSpecs(JobContext job) throws IOException, InterruptedException {
+        Configuration conf = job.getConfiguration();
+        String location = conf.get( TableStorer.OUTPUT_PATH );
+        String schemaStr = conf.get( TableStorer.OUTPUT_SCHEMA );
+        String storageHint = conf.get( TableStorer.OUTPUT_STORAGEHINT );
+        String sortColumnNames = conf.get( TableStorer.SORT_INFO );
+
+        BasicTable.Writer writer = new BasicTable.Writer( new Path( location ), 
+                schemaStr, storageHint, sortColumnNames, null, conf );
+        writer.finish();
+    }
+
+    @Override
+    public OutputCommitter getOutputCommitter(TaskAttemptContext taContext)
+    throws IOException, InterruptedException {
+        return new TableOutputCommitter() ;
+    }
+
+    @Override
+    public org.apache.hadoop.mapreduce.RecordWriter<BytesWritable, Tuple> getRecordWriter(
+            TaskAttemptContext taContext) throws IOException, InterruptedException {
+        return new TableRecordWriter( taContext );
+    }
+
+}
+
+// TODO: make corresponding changes for commit and cleanup. Currently, no-ops.
+class TableOutputCommitter extends OutputCommitter {
+    @Override
+    public void abortTask(TaskAttemptContext taContext) throws IOException {
+        // TODO Auto-generated method stub
+    }
+
+    @Override
+    public void cleanupJob(JobContext jobContext) throws IOException {
+//    	Configuration conf = jobContext.getConfiguration();
+//        String location = conf.get( TableStorer.OUTPUT_PATH );
+//        BasicTable.Writer write = new BasicTable.Writer( new Path( location ), conf );
+//        write.close();
+    }
+
+    @Override
+    public void commitTask(TaskAttemptContext taContext) throws IOException {
+    	int i = 0;
+    	i++;
+        // TODO Auto-generated method stub
+    }
+
+    @Override
+    public boolean needsTaskCommit(TaskAttemptContext taContext) throws IOException {
+        return false;
+    }
+
+    @Override
+    public void setupJob(JobContext jobContext) throws IOException {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    public void setupTask(TaskAttemptContext taContext) throws IOException {
+        // TODO Auto-generated method stub
+    }
+
 }
 
 /**
@@ -166,65 +244,64 @@
  * Table RecordWriter
  * 
  */
-class TableRecordWriter implements RecordWriter<BytesWritable, Tuple> {
-	final private BytesWritable KEY0 = new BytesWritable(new byte[0]); 
-	private BasicTable.Writer writer;
-	private TableInserter inserter;
-  private int[] sortColIndices = null;
-  KeyGenerator builder;
-  Tuple t;
-
-	public TableRecordWriter(String name, JobConf conf) throws IOException {
-		StoreConfig storeConfig = MapRedUtil.getStoreConfig(conf);
-		String location = storeConfig.getLocation();
-
-		// TODO: how to get? 1) column group splits; 2) flag of sorted-ness,
-		// compression, etc.
-		writer = new BasicTable.Writer(new Path(location), conf);
-
-    if (writer.getSortInfo() != null)
-    {
-      sortColIndices = writer.getSortInfo().getSortIndices();
-      SortInfo sortInfo =  writer.getSortInfo();
-      String[] sortColNames = sortInfo.getSortColumnNames();
-      org.apache.hadoop.zebra.schema.Schema schema = writer.getSchema();
-
-      byte[] types = new byte[sortColNames.length];
-      for(int i =0 ; i < sortColNames.length; ++i){
-        types[i] = schema.getColumn(sortColNames[i]).getType().pigDataType();
-      }
-      t = TypesUtils.createTuple(sortColNames.length);
-      builder = makeKeyBuilder(types);
-    }
-
-		inserter = writer.getInserter(name, false);
-	}
-
-	@Override
-	public void close(Reporter reporter) throws IOException {
-		inserter.close();
-		writer.finish();
-	}
-
-  private KeyGenerator makeKeyBuilder(byte[] elems) {
-	    ComparatorExpr[] exprs = new ComparatorExpr[elems.length];
-	    for (int i = 0; i < elems.length; ++i) {
-	      exprs[i] = ExprUtils.primitiveComparator(i, elems[i]);
-	    }
-	    return new KeyGenerator(ExprUtils.tupleComparator(exprs));
-  }
-
-  @Override
-  public void write(BytesWritable key, Tuple value) throws IOException {
-    if (sortColIndices != null)
-    {
-      for(int i =0; i < sortColIndices.length;++i) {
-        t.set(i, value.get(sortColIndices[i]));
-      }
-      key = builder.generateKey(t);
-    } else if (key == null) {
-      key = KEY0;
+class TableRecordWriter extends RecordWriter<BytesWritable, Tuple> {
+    final private BytesWritable KEY0 = new BytesWritable(new byte[0]); 
+    private BasicTable.Writer writer;
+    private TableInserter inserter;
+    private int[] sortColIndices = null;
+    KeyGenerator builder;
+    Tuple t;
+
+    public TableRecordWriter(TaskAttemptContext taContext) throws IOException {
+        Configuration conf = taContext.getConfiguration();
+
+        String path = conf.get(TableStorer.OUTPUT_PATH);
+        writer = new BasicTable.Writer( new Path( path ), conf );
+
+        if (writer.getSortInfo() != null)
+        {
+            sortColIndices = writer.getSortInfo().getSortIndices();
+            SortInfo sortInfo =  writer.getSortInfo();
+            String[] sortColNames = sortInfo.getSortColumnNames();
+            org.apache.hadoop.zebra.schema.Schema schema = writer.getSchema();
+
+            byte[] types = new byte[sortColNames.length];
+            for(int i =0 ; i < sortColNames.length; ++i){
+                types[i] = schema.getColumn(sortColNames[i]).getType().pigDataType();
+            }
+            t = TypesUtils.createTuple(sortColNames.length);
+            builder = makeKeyBuilder(types);
+        }
+
+        inserter = writer.getInserter( "patition-" + taContext.getTaskAttemptID().getTaskID().getId(), false );
+    }
+
+    @Override
+    public void close(TaskAttemptContext taContext) throws IOException {
+        inserter.close();
+        writer.finish();
     }
-    inserter.insert(key, value);
-  }
+
+    private KeyGenerator makeKeyBuilder(byte[] elems) {
+        ComparatorExpr[] exprs = new ComparatorExpr[elems.length];
+        for (int i = 0; i < elems.length; ++i) {
+            exprs[i] = ExprUtils.primitiveComparator(i, elems[i]);
+        }
+        return new KeyGenerator(ExprUtils.tupleComparator(exprs));
+    }
+
+    @Override
+    public void write(BytesWritable key, Tuple value) throws IOException {
+        System.out.println( "Tuple: " + value.toDelimitedString(",") );
+        if (sortColIndices != null)    {
+            for(int i =0; i < sortColIndices.length;++i) {
+                t.set(i, value.get(sortColIndices[i]));
+            }
+            key = builder.generateKey(t);
+        } else if (key == null) {
+            key = KEY0;
+        }
+        inserter.insert(key, value);
+    }
+
 }

Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/schema/Schema.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/schema/Schema.java?rev=909667&r1=909666&r2=909667&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/schema/Schema.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/schema/Schema.java Sat Feb 13 00:06:15 2010
@@ -776,8 +776,9 @@
             keyentries.add(keys[j]);
           }
         }
+      } else { 
+    	  result.add( new ColumnSchema(pn.mName, null, ColumnType.ANY ) );
       }
-      else result.add(null);
     }
     return result;
   }

Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputs.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputs.java?rev=909667&r1=909666&r2=909667&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputs.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputs.java Sat Feb 13 00:06:15 2010
@@ -91,19 +91,18 @@
 
   static String inputPath;
   static String inputFileName = "multi-input.txt";
-  protected static ExecType execType = ExecType.MAPREDUCE;
+  //protected static ExecType execType = ExecType.MAPREDUCE;
+  protected static ExecType execType = ExecType.LOCAL;
   private static MiniCluster cluster;
   protected static PigServer pigServer;
-  // private static Path pathWorking, pathTable1, path2, path3,
-  // pathTable4, pathTable5;
   private static Configuration conf;
   public static String sortKey = null;
 
   private static FileSystem fs;
 
-  private static String zebraJar;
-  private static String whichCluster;
-  private static String multiLocs;
+  private static String zebraJar = null;
+  private static String whichCluster = null;
+  private static String multiLocs = null;
   private static String strTable1 = null;
   private static String strTable2 = null;
   private static String strTable3 = null;
@@ -112,52 +111,54 @@
   public static void setUpOnce() throws IOException {
     if (System.getenv("hadoop.log.dir") == null) {
       String base = new File(".").getPath(); // getAbsolutePath();
-      System
-          .setProperty("hadoop.log.dir", new Path(base).toString() + "./logs");
+      System.setProperty("hadoop.log.dir", new Path(base).toString() + "./logs");
     }
 
+    /* By default, we use miniCluster */
     if (System.getProperty("whichCluster") == null) {
-      System.setProperty("whichCluster", "realCluster");
-      System.out.println("should be called");
-      whichCluster = System.getProperty("whichCluster");
+      whichCluster = "miniCluster";
+      System.setProperty("whichCluster", "miniCluster");
     } else {
       whichCluster = System.getProperty("whichCluster");
     }
 
-    System.out.println("clusterddddd: " + whichCluster);
-    System.out.println(" get env hadoop home: " + System.getenv("HADOOP_HOME"));
-    System.out.println(" get env user name: " + System.getenv("USER"));
-    if ((whichCluster.equalsIgnoreCase("realCluster") && System
-        .getenv("HADOOP_HOME") == null)) {
-      System.out.println("Please set HADOOP_HOME");
-      System.exit(0);
-    }
+    System.out.println("cluster: " + whichCluster);
+    
+    if (whichCluster.equals("realCluster")) {
+      System.out.println(" get env hadoop home: " + System.getenv("HADOOP_HOME"));
+      System.out.println(" get env user name: " + System.getenv("USER"));
+      
+      if (System.getenv("HADOOP_HOME") == null) {
+        System.out.println("Please set HADOOP_HOME for realCluster testing mode");
+        System.exit(0);        
+      }
+
+      if (System.getenv("USER") == null) {
+        System.out.println("Please set USER for realCluster testing mode");
+        System.exit(0);        
+      }
+      
+      zebraJar = System.getenv("HADOOP_HOME") + "/lib/zebra.jar";
+
+      File file = new File(zebraJar);
+      if (!file.exists()) {
+        System.out.println("Please place zebra.jar at $HADOOP_HOME/lib");
+        System.exit(0);
+      }
+    }    
 
     conf = new Configuration();
-
-    if ((whichCluster.equalsIgnoreCase("realCluster") && System.getenv("USER") == null)) {
-      System.out.println("Please set USER");
-      System.exit(0);
-    }
-    zebraJar = System.getenv("HADOOP_HOME") + "/lib/zebra.jar";
-
-    File file = new File(zebraJar);
-    if (!file.exists() && whichCluster.equalsIgnoreCase("realCluster")) {
-      System.out.println("Please put zebra.jar at hadoop_home/lib");
-      System.exit(0);
-    }
-
+    
     // set inputPath and output path
     String workingDir = null;
-    if (whichCluster.equalsIgnoreCase("realCluster")) {
-      inputPath = new String("/user/" + System.getenv("USER") + "/"
-          + inputFileName);
+    if( whichCluster.equals("realCluster")) {
+      inputPath = new String("/user/" + System.getenv("USER") + "/" + inputFileName);
       System.out.println("inputPath: " + inputPath);
       multiLocs = new String("/user/" + System.getenv("USER") + "/" + "us"
           + "," + "/user/" + System.getenv("USER") + "/" + "india" + ","
           + "/user/" + System.getenv("USER") + "/" + "japan");
+      
       fs = new Path(inputPath).getFileSystem(conf);
-
     } else {
       RawLocalFileSystem rawLFS = new RawLocalFileSystem();
       fs = new LocalFileSystem(rawLFS);
@@ -167,7 +168,9 @@
       multiLocs = new String(workingDir + "/" + "us" + "," + workingDir + "/"
           + "india" + "," + workingDir + "/" + "japan");
     }
+    
     writeToFile(inputPath);
+    
     // check inputPath existence
     File inputFile = new File(inputPath);
     if (!inputFile.exists() && whichCluster.equalsIgnoreCase("realCluster")) {
@@ -181,8 +184,8 @@
       System.exit(0);
     }
 
-    if (whichCluster.equalsIgnoreCase("realCluster")) {
-      pigServer = new PigServer(ExecType.MAPREDUCE, ConfigurationUtil
+    if (whichCluster.equals("realCluster")) {
+      pigServer = new PigServer(ExecType.MAPREDUCE, ConfigurationUtil 
           .toProperties(conf));
       pigServer.registerJar(zebraJar);
 
@@ -231,35 +234,35 @@
 
   public static void writeToFile (String inputFile) throws IOException{
     if (whichCluster.equalsIgnoreCase("miniCluster")){
-    FileWriter fstream = new FileWriter(inputFile);
-    BufferedWriter out = new BufferedWriter(fstream);
-    out.write("us 2\n");
-    out.write("japan 2\n");
-    out.write("india 4\n");
-    out.write("us 2\n");
-    out.write("japan 1\n");
-    out.write("india 3\n");
-    out.write("nouse 5\n");
-    out.write("nowhere 4\n");
-    out.close();
-    }
-    if (whichCluster.equalsIgnoreCase("realCluster")){
-    FSDataOutputStream fout = fs.create(new Path (inputFile));
-    fout.writeBytes("us 2\n");
-    fout.writeBytes("japan 2\n");
-    fout.writeBytes("india 4\n");
-    fout.writeBytes("us 2\n");
-    fout.writeBytes("japan 1\n");
-    fout.writeBytes("india 3\n");
-    fout.writeBytes("nouse 5\n");
-    fout.writeBytes("nowhere 4\n");
-    fout.close();
+      FileWriter fstream = new FileWriter(inputFile);
+      BufferedWriter out = new BufferedWriter(fstream);
+      out.write("us 2\n");
+      out.write("japan 2\n");
+      out.write("india 4\n");
+      out.write("us 2\n");
+      out.write("japan 1\n");
+      out.write("india 3\n");
+      out.write("nouse 5\n");
+      out.write("nowhere 4\n");
+      out.close();
+    } else if( whichCluster.equalsIgnoreCase("realCluster") ) {
+      FSDataOutputStream fout = fs.create(new Path (inputFile));
+      fout.writeBytes("us 2\n");
+      fout.writeBytes("japan 2\n");
+      fout.writeBytes("india 4\n");
+      fout.writeBytes("us 2\n");
+      fout.writeBytes("japan 1\n");
+      fout.writeBytes("india 3\n");
+      fout.writeBytes("nouse 5\n");
+      fout.writeBytes("nowhere 4\n");
+      fout.close();
     }
   }
   
   public Path generateOutPath(String currentMethod) {
     Path outPath = null;
-    if (whichCluster.equalsIgnoreCase("realCluster")) {
+    if (whichCluster.equalsIgnoreCase("realCluster") || 
+    		whichCluster.equalsIgnoreCase("miniCluster") ) {
       outPath = new Path("/user/" + System.getenv("USER") + "/multiOutput/"
           + currentMethod);
     } else {
@@ -272,7 +275,7 @@
 
   public void removeDir(Path outPath) throws IOException {
     String command = null;
-    if (whichCluster.equalsIgnoreCase("realCluster")) {
+    if (whichCluster.equals("realCluster")) {
       command = System.getenv("HADOOP_HOME") + "/bin/hadoop fs -rmr "
           + outPath.toString();
     } else {
@@ -510,7 +513,7 @@
     System.out.println("hello sort on word and count");
     String methodName = getCurrentMethodName();
     String myMultiLocs = null;
-    if (whichCluster.equalsIgnoreCase("realCluster")) {
+    if (whichCluster.equalsIgnoreCase("realCluster") ) {
       myMultiLocs = new String("/user/" + System.getenv("USER") + "/" + "us"
           + methodName + "," + "/user/" + System.getenv("USER") + "/" + "india"
           + methodName + "," + "/user/" + System.getenv("USER") + "/" + "japan"

Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputs2.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputs2.java?rev=909667&r1=909666&r2=909667&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputs2.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputs2.java Sat Feb 13 00:06:15 2010
@@ -90,7 +90,8 @@
 
   static String inputPath;
   static String inputFileName = "multi-input.txt";
-  protected static ExecType execType = ExecType.MAPREDUCE;
+  //protected static ExecType execType = ExecType.MAPREDUCE;
+  protected static ExecType execType = ExecType.LOCAL;
   private static MiniCluster cluster;
   protected static PigServer pigServer;
   // private static Path pathWorking, pathTable1, path2, path3,
@@ -116,7 +117,7 @@
     }
 
     if (System.getProperty("whichCluster") == null) {
-      System.setProperty("whichCluster", "realCluster");
+      System.setProperty("whichCluster", "miniCluster");
       System.out.println("should be called");
       whichCluster = System.getProperty("whichCluster");
     } else {

Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputs2TypedApi.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputs2TypedApi.java?rev=909667&r1=909666&r2=909667&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputs2TypedApi.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputs2TypedApi.java Sat Feb 13 00:06:15 2010
@@ -92,7 +92,8 @@
 
   static String inputPath;
   static String inputFileName = "multi-input.txt";
-  protected static ExecType execType = ExecType.MAPREDUCE;
+  //protected static ExecType execType = ExecType.MAPREDUCE;
+  protected static ExecType execType = ExecType.LOCAL;
   private static MiniCluster cluster;
   protected static PigServer pigServer;
   // private static Path pathWorking, pathTable1, path2, path3,
@@ -118,7 +119,7 @@
     }
 
     if (System.getProperty("whichCluster") == null) {
-      System.setProperty("whichCluster", "realCluster");
+      System.setProperty("whichCluster", "miniCluster");
       System.out.println("should be called");
       whichCluster = System.getProperty("whichCluster");
     } else {

Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputs3.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputs3.java?rev=909667&r1=909666&r2=909667&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputs3.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputs3.java Sat Feb 13 00:06:15 2010
@@ -90,7 +90,8 @@
 
   static String inputPath;
   static String inputFileName = "multi-input.txt";
-  protected static ExecType execType = ExecType.MAPREDUCE;
+  //protected static ExecType execType = ExecType.MAPREDUCE;
+  protected static ExecType execType = ExecType.LOCAL;
   private static MiniCluster cluster;
   protected static PigServer pigServer;
   // private static Path pathWorking, pathTable1, path2, path3,
@@ -116,7 +117,7 @@
     }
 
     if (System.getProperty("whichCluster") == null) {
-      System.setProperty("whichCluster", "realCluster");
+      System.setProperty("whichCluster", "miniCluster");
       System.out.println("should be called");
       whichCluster = System.getProperty("whichCluster");
     } else {

Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputs3TypedApi.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputs3TypedApi.java?rev=909667&r1=909666&r2=909667&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputs3TypedApi.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputs3TypedApi.java Sat Feb 13 00:06:15 2010
@@ -92,7 +92,8 @@
 
   static String inputPath;
   static String inputFileName = "multi-input.txt";
-  protected static ExecType execType = ExecType.MAPREDUCE;
+  //protected static ExecType execType = ExecType.MAPREDUCE;
+  protected static ExecType execType = ExecType.LOCAL;
   private static MiniCluster cluster;
   protected static PigServer pigServer;
   // private static Path pathWorking, pathTable1, path2, path3,
@@ -118,7 +119,7 @@
     }
 
     if (System.getProperty("whichCluster") == null) {
-      System.setProperty("whichCluster", "realCluster");
+      System.setProperty("whichCluster", "miniCluster");
       System.out.println("should be called");
       whichCluster = System.getProperty("whichCluster");
     } else {

Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputs4.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputs4.java?rev=909667&r1=909666&r2=909667&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputs4.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputs4.java Sat Feb 13 00:06:15 2010
@@ -90,7 +90,8 @@
 
   static String inputPath;
   static String inputFileName = "multi-input.txt";
-  protected static ExecType execType = ExecType.MAPREDUCE;
+  //protected static ExecType execType = ExecType.MAPREDUCE;
+  protected static ExecType execType = ExecType.LOCAL;
   private static MiniCluster cluster;
   protected static PigServer pigServer;
   // private static Path pathWorking, pathTable1, path2, path3,
@@ -116,7 +117,7 @@
     }
 
     if (System.getProperty("whichCluster") == null) {
-      System.setProperty("whichCluster", "realCluster");
+      System.setProperty("whichCluster", "miniCluster");
       System.out.println("should be called");
       whichCluster = System.getProperty("whichCluster");
     } else {

Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputs4TypedApi.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputs4TypedApi.java?rev=909667&r1=909666&r2=909667&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputs4TypedApi.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputs4TypedApi.java Sat Feb 13 00:06:15 2010
@@ -92,7 +92,7 @@
 
   static String inputPath;
   static String inputFileName = "multi-input.txt";
-  protected static ExecType execType = ExecType.MAPREDUCE;
+  protected static ExecType execType = ExecType.LOCAL;
   private static MiniCluster cluster;
   protected static PigServer pigServer;
   // private static Path pathWorking, pathTable1, path2, path3,
@@ -118,7 +118,7 @@
     }
 
     if (System.getProperty("whichCluster") == null) {
-      System.setProperty("whichCluster", "realCluster");
+      System.setProperty("whichCluster", "miniCluster");
       System.out.println("should be called");
       whichCluster = System.getProperty("whichCluster");
     } else {

Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputsTypeApi.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputsTypeApi.java?rev=909667&r1=909666&r2=909667&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputsTypeApi.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputsTypeApi.java Sat Feb 13 00:06:15 2010
@@ -93,7 +93,8 @@
 
   static String inputPath;
   static String inputFileName = "multi-input.txt";
-  protected static ExecType execType = ExecType.MAPREDUCE;
+  //protected static ExecType execType = ExecType.MAPREDUCE;
+  protected static ExecType execType = ExecType.LOCAL;
   private static MiniCluster cluster;
   protected static PigServer pigServer;
   // private static Path pathWorking, pathTable1, path2, path3,
@@ -119,7 +120,7 @@
     }
 
     if (System.getProperty("whichCluster") == null) {
-      System.setProperty("whichCluster", "realCluster");
+      System.setProperty("whichCluster", "miniCluster");
       System.out.println("should be called");
       whichCluster = System.getProperty("whichCluster");
     } else {

Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputsTypedApiNeg.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputsTypedApiNeg.java?rev=909667&r1=909666&r2=909667&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputsTypedApiNeg.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputsTypedApiNeg.java Sat Feb 13 00:06:15 2010
@@ -92,7 +92,8 @@
 
   static String inputPath;
   static String inputFileName = "multi-input.txt";
-  protected static ExecType execType = ExecType.MAPREDUCE;
+  //protected static ExecType execType = ExecType.MAPREDUCE;
+  protected static ExecType execType = ExecType.LOCAL;
   private static MiniCluster cluster;
   protected static PigServer pigServer;
   // private static Path pathWorking, pathTable1, path2, path3,
@@ -118,7 +119,7 @@
     }
 
     if (System.getProperty("whichCluster") == null) {
-      System.setProperty("whichCluster", "realCluster");
+      System.setProperty("whichCluster", "miniCluster");
       System.out.println("should be called");
       whichCluster = System.getProperty("whichCluster");
     } else {

Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestTypedApi.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestTypedApi.java?rev=909667&r1=909666&r2=909667&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestTypedApi.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestTypedApi.java Sat Feb 13 00:06:15 2010
@@ -103,7 +103,7 @@
 
   static String inputPath;
   static String inputFileName = "multi-input.txt";
-  protected static ExecType execType = ExecType.MAPREDUCE;
+  protected static ExecType execType = ExecType.LOCAL;
   private static MiniCluster cluster;
   protected static PigServer pigServer;
   // private static Path pathWorking, pathTable1, path2, path3,
@@ -129,7 +129,7 @@
     }
 
     if (System.getProperty("whichCluster") == null) {
-      System.setProperty("whichCluster", "realCluster");
+      System.setProperty("whichCluster", "miniCluster");
       System.out.println("should be called");
       whichCluster = System.getProperty("whichCluster");
     } else {

Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestTypedApi2.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestTypedApi2.java?rev=909667&r1=909666&r2=909667&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestTypedApi2.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestTypedApi2.java Sat Feb 13 00:06:15 2010
@@ -101,7 +101,7 @@
 
   static String inputPath;
   static String inputFileName = "multi-input.txt";
-  protected static ExecType execType = ExecType.MAPREDUCE;
+  protected static ExecType execType = ExecType.LOCAL;
   private static MiniCluster cluster;
   protected static PigServer pigServer;
   // private static Path pathWorking, pathTable1, path2, path3,
@@ -127,7 +127,7 @@
     }
 
     if (System.getProperty("whichCluster") == null) {
-      System.setProperty("whichCluster", "realCluster");
+      System.setProperty("whichCluster", "miniCluster");
       System.out.println("should be called");
       whichCluster = System.getProperty("whichCluster");
     } else {

Added: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/ArticleGenerator.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/ArticleGenerator.java?rev=909667&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/ArticleGenerator.java (added)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/ArticleGenerator.java Sat Feb 13 00:06:15 2010
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.zebra.mapreduce;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.zebra.tfile.RandomDistribution.DiscreteRNG;
+import org.apache.hadoop.zebra.tfile.RandomDistribution.Flat;
+
+/**
+ * Generate some input text files.
+ */
+class ArticleGenerator {
+	Random random;
+	Dictionary dict;
+	int pageWidth;
+	DiscreteRNG lastLineLenGen;
+	DiscreteRNG paragraphLineLenGen;
+	DiscreteRNG paragraphLenGen;
+	long wordCount;
+	long lineCount;
+
+	/**
+	 * Create an article generator.
+	 * 
+	 * @param dictWordCnt
+	 *          Number of words in the dictionary.
+	 * @param minWordLen
+	 *          Minimum word length
+	 * @param maxWordLen
+	 *          Maximum word length
+	 * @param lineWidth
+	 *          Line width.
+	 */
+	ArticleGenerator(int dictWordCnt, int minWordLen, int maxWordLen,
+			int pageWidth) {
+		random = new Random(System.nanoTime());
+		dict = new Dictionary(random, dictWordCnt, minWordLen, maxWordLen, 100);
+		this.pageWidth = pageWidth;
+		lastLineLenGen = new Flat(random, 1, pageWidth);
+		paragraphLineLenGen = new Flat(random, pageWidth * 3 / 4, pageWidth);
+		paragraphLenGen = new Flat(random, 1, 40);
+	}
+
+	/**
+	 * Create an article
+	 * 
+	 * @param fs
+	 *          File system.
+	 * @param path
+	 *          path of the file
+	 * @param length
+	 *          Expected size of the file.
+	 * @throws IOException
+	 */
+	void createArticle(FileSystem fs, Path path, long length) throws IOException {
+		FSDataOutputStream fsdos = fs.create(path, false);
+		StringBuilder sb = new StringBuilder();
+		int remainLinesInParagraph = paragraphLenGen.nextInt();
+		while (fsdos.getPos() < length) {
+			if (remainLinesInParagraph == 0) {
+				remainLinesInParagraph = paragraphLenGen.nextInt();
+				fsdos.write('\n');
+			}
+			int lineLen = paragraphLineLenGen.nextInt();
+			if (--remainLinesInParagraph == 0) {
+				lineLen = lastLineLenGen.nextInt();
+			}
+			sb.setLength(0);
+			while (sb.length() < lineLen) {
+				if (sb.length() > 0) {
+					sb.append(' ');
+				}
+				sb.append(dict.nextWord());
+				++wordCount;
+			}
+			sb.append('\n');
+			fsdos.write(sb.toString().getBytes());
+			++lineCount;
+		}
+		fsdos.close();
+	}
+
+	/**
+	 * Create a bunch of files under the same directory.
+	 * 
+	 * @param fs
+	 *          File system
+	 * @param parent
+	 *          directory where files should be created
+	 * @param prefix
+	 *          prefix name of the files
+	 * @param n
+	 *          total number of files
+	 * @param length
+	 *          length of each file.
+	 * @throws IOException
+	 */
+	void batchArticalCreation(FileSystem fs, Path parent, String prefix, int n,
+			long length) throws IOException {
+		for (int i = 0; i < n; ++i) {
+			createArticle(fs, new Path(parent, String.format("%s%06d", prefix, i)),
+					length);
+		}
+	}
+
+	static class Summary {
+		long wordCount;
+		long lineCount;
+		Map<String, Long> wordCntDist;
+
+		Summary() {
+			wordCntDist = new HashMap<String, Long>();
+		}
+	}
+
+	void resetSummary() {
+		wordCount = 0;
+		lineCount = 0;
+		dict.resetWordCnts();
+	}
+
+	Summary getSummary() {
+		Summary ret = new Summary();
+		ret.wordCount = wordCount;
+		ret.lineCount = lineCount;
+		ret.wordCntDist = dict.getWordCounts();
+		return ret;
+	}
+}

Added: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/Dictionary.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/Dictionary.java?rev=909667&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/Dictionary.java (added)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/Dictionary.java Sat Feb 13 00:06:15 2010
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.zebra.mapreduce;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+
+import org.apache.hadoop.zebra.tfile.RandomDistribution.Binomial;
+import org.apache.hadoop.zebra.tfile.RandomDistribution.DiscreteRNG;
+import org.apache.hadoop.zebra.tfile.RandomDistribution.Zipf;
+
+/**
+ * A dictionary that generates English words, whose frequency follows Zipf
+ * distributions, and length follows Binomial distribution.
+ */
+class Dictionary {
+	private static final double BINOMIAL_P = 0.3;
+	private static final double SIGMA = 1.1;
+	private final int lead;
+	private final Zipf zipf;
+	private final String[] dict;
+	private final long[] wordCnts;
+
+	private static String makeWord(DiscreteRNG rng, Random random) {
+		int len = rng.nextInt();
+		StringBuilder sb = new StringBuilder(len);
+		for (int i = 0; i < len; ++i) {
+			sb.append((char) ('a' + random.nextInt(26)));
+		}
+		return sb.toString();
+	}
+
+	/**
+	 * Constructor
+	 * 
+	 * @param entries
+	 *          How many words exist in the dictionary.
+	 * @param minWordLen
+	 *          Minimum word length.
+	 * @param maxWordLen
+	 *          Maximum word length.
+	 * @param freqRatio
+	 *          Expected ratio between the most frequent words and the least
+	 *          frequent words. (e.g. 100)
+	 */
+	public Dictionary(Random random, int entries, int minWordLen, int maxWordLen,
+			int freqRatio) {
+		Binomial binomial = new Binomial(random, minWordLen, maxWordLen, BINOMIAL_P);
+		lead = Math.max(0,
+				(int) (entries / (Math.exp(Math.log(freqRatio) / SIGMA) - 1)) - 1);
+		zipf = new Zipf(random, lead, entries + lead, 1.1);
+		dict = new String[entries];
+		// Use a set to ensure no dup words in dictionary
+		Set<String> dictTmp = new HashSet<String>();
+		for (int i = 0; i < entries; ++i) {
+			while (true) {
+				String word = makeWord(binomial, random);
+				if (!dictTmp.contains(word)) {
+					dictTmp.add(word);
+					dict[i] = word;
+					break;
+				}
+			}
+		}
+		wordCnts = new long[dict.length];
+	}
+
+	/**
+	 * Get the next word from the dictionary.
+	 * 
+	 * @return The next word from the dictionary.
+	 */
+	public String nextWord() {
+		int index = zipf.nextInt() - lead;
+		++wordCnts[index];
+		return dict[index];
+	}
+
+	public void resetWordCnts() {
+		for (int i = 0; i < wordCnts.length; ++i) {
+			wordCnts[i] = 0;
+		}
+	}
+
+	public Map<String, Long> getWordCounts() {
+		Map<String, Long> ret = new HashMap<String, Long>();
+		for (int i = 0; i < dict.length; ++i) {
+			if (wordCnts[i] > 0) {
+				ret.put(dict[i], wordCnts[i]);
+			}
+		}
+		return ret;
+	}
+}

Added: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/TableMRSample.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/TableMRSample.java?rev=909667&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/TableMRSample.java (added)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/TableMRSample.java Sat Feb 13 00:06:15 2010
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.zebra.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.zebra.mapreduce.BasicTableOutputFormat;
+import org.apache.hadoop.zebra.parser.ParseException;
+import org.apache.hadoop.zebra.schema.Schema;
+import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.pig.data.Tuple;
+
+/**
+ * This is a sample a complete MR sample code for Table. It doens't contain
+ * 'read' part. But, it should be similar and easier to write. Refer to test
+ * cases in the same directory.
+ * 
+ * Assume the input files contain rows of word and count, separated by a space:
+ * 
+ * <pre>
+ * this 2
+ * is 1
+ * a 4 
+ * test 2 
+ * hello 1 
+ * world 3
+ * </pre>
+ * 
+ */
+public class TableMRSample {
+	static class MapClass extends
+	Mapper<LongWritable, Text, BytesWritable, Tuple> {
+		private BytesWritable bytesKey;
+		private Tuple tupleRow;
+
+		@Override
+		public void map(LongWritable key, Text value, Context context)
+		throws IOException, InterruptedException {
+
+			// value should contain "word count"
+			String[] wdct = value.toString().split(" ");
+			if (wdct.length != 2) {
+				// LOG the error
+				return;
+			}
+
+			byte[] word = wdct[0].getBytes();
+			bytesKey.set(word, 0, word.length);
+			tupleRow.set(0, new String(word));
+			tupleRow.set(1, Integer.parseInt(wdct[1]));
+
+			context.write(bytesKey, tupleRow);
+		}
+
+		@Override
+		public void setup(Context context) {
+			bytesKey = new BytesWritable();
+			try {
+				Schema outSchema = BasicTableOutputFormat.getSchema(context);
+				tupleRow = TypesUtils.createTuple(outSchema);
+			} catch (IOException e) {
+				throw new RuntimeException(e);
+			} catch (ParseException e) {
+				throw new RuntimeException(e);
+			}
+		}
+
+	}
+
+	public static void main(String[] args) throws ParseException, IOException, 
+	InterruptedException, ClassNotFoundException {
+		Job job = new Job();
+		job.setJobName("tableMRSample");
+		Configuration conf = job.getConfiguration();
+		conf.set("table.output.tfile.compression", "gz");
+
+		// input settings
+		job.setInputFormatClass(TextInputFormat.class);
+		job.setMapperClass(TableMRSample.MapClass.class);
+		FileInputFormat.setInputPaths(job, new Path(
+				"/user/joe/inputdata/input.txt"));
+
+		// output settings
+		Path outPath = new Path("/user/joe/outputdata/");
+		job.setOutputFormatClass(BasicTableOutputFormat.class);
+		BasicTableOutputFormat.setOutputPath(job, outPath);
+		// set the logical schema with 2 columns
+		BasicTableOutputFormat.setSchema(job, "word:string, count:int");
+		// for demo purposes, create 2 physical column groups
+		BasicTableOutputFormat.setStorageHint(job, "[word];[count]");
+
+		// set map-only job.
+		job.setNumReduceTasks(0);
+		job.submit();
+	}
+}