You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2008/01/22 22:17:22 UTC

svn commit: r614325 [4/6] - in /incubator/pig/branches/types: ./ lib/ scripts/ src/org/apache/pig/ src/org/apache/pig/builtin/ src/org/apache/pig/data/ src/org/apache/pig/impl/ src/org/apache/pig/impl/builtin/ src/org/apache/pig/impl/eval/ src/org/apac...

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/io/DataBagFileReader.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/io/DataBagFileReader.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/io/DataBagFileReader.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/io/DataBagFileReader.java Tue Jan 22 13:17:12 2008
@@ -15,75 +15,87 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.pig.impl.io;
+/*
+package org.apache.pig.impl.io;
+
+import java.io.BufferedInputStream;
+import java.io.DataInputStream;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.Iterator;
 
-import java.io.BufferedInputStream;
-import java.io.DataInputStream;
-import java.io.EOFException;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.util.Iterator;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.mapreduceExec.PigMapReduce;
 
-import org.apache.pig.data.Datum;
-import org.apache.pig.data.DatumImpl;
+
+public class DataBagFileReader {
+	File store;
+	
+	public DataBagFileReader(File f) throws IOException{
+		store = f;
+	}
+	
+    public static int notifyInterval = 1000;
+    public int numNotifies;
+	private class myIterator implements Iterator<Tuple>{
+		DataInputStream in;
+		Tuple nextTuple;
+        int curCall;
+		
+		public myIterator() throws IOException{
+            numNotifies = 0;
+			in = new DataInputStream(new BufferedInputStream(new FileInputStream(store)));
+			getNextTuple();
+		}
+		
+		private void getNextTuple() throws IOException{
+            if (curCall < notifyInterval - 1)
+                curCall ++;
+            else{
+                if (PigMapReduce.reporter != null)
+                    PigMapReduce.reporter.progress();
+                curCall = 0;
+                numNotifies ++;
+            }
 
-
-public class DataBagFileReader {
-	File store;
-	
-	public DataBagFileReader(File f) throws IOException{
-		store = f;
-	}
-	
-	private class myIterator implements Iterator<Datum>{
-		DataInputStream in;
-		Datum nextDatum;
-		
-		public myIterator() throws IOException{
-			in = new DataInputStream(new BufferedInputStream(new FileInputStream(store)));
-			getNextDatum();
-		}
-		
-		private void getNextDatum() throws IOException{
-			try{
-				/*
-				nextDatum = new Datum();
-		        nextDatum.readFields(in);
-				*/
-				nextDatum = DatumImpl.readDatum(in);
-			} catch (EOFException e) {
-				in.close();
-				nextDatum = null;
-			}
-		}
-		
-		public boolean hasNext(){
-			return nextDatum != null;
-		}
-		
-		public Datum next(){
-			Datum returnValue = nextDatum;
-			if (returnValue!=null){
-				try{
-					getNextDatum();
-				}catch (IOException e){
-					throw new RuntimeException(e.getMessage());
-				}
-			}
-			return returnValue;
-		}
-		
-		public void remove(){
-			throw new RuntimeException("Read only cursor");
-		}
-	}
-
-	public Iterator<Datum> content() throws IOException{
-		return new myIterator();		
-	}
-	
-	public void clear() throws IOException{
-		store.delete();
-	}
-}
+			try{
+				nextTuple = new Tuple();
+		        nextTuple.readFields(in);
+			} catch (EOFException e) {
+				in.close();
+				nextTuple = null;
+			}
+		}
+		
+		public boolean hasNext(){
+			return nextTuple != null;
+		}
+		
+		public Tuple next(){
+			Tuple returnValue = nextTuple;
+			if (returnValue!=null){
+				try{
+					getNextTuple();
+				}catch (IOException e){
+					throw new RuntimeException(e.getMessage());
+				}
+			}
+			return returnValue;
+		}
+		
+		public void remove(){
+			throw new RuntimeException("Read only cursor");
+		}
+	}
+
+	public Iterator<Tuple> content() throws IOException{
+		return new myIterator();		
+	}
+	
+	public void clear() throws IOException{
+		store.delete();
+	}
+}
+*/

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/io/DataBagFileWriter.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/io/DataBagFileWriter.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/io/DataBagFileWriter.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/io/DataBagFileWriter.java Tue Jan 22 13:17:12 2008
@@ -15,44 +15,56 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.pig.impl.io;
-
-import java.io.BufferedOutputStream;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.util.Iterator;
-
-import org.apache.pig.data.Datum;
-
-
-
-public class DataBagFileWriter {
-	File store;
-	DataOutputStream out;
-
-	public DataBagFileWriter(File store) throws IOException{
-		this.store = store;
-		out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(store)));
-	}
-	
-	public void write(Datum d) throws IOException{
-		d.write(out);
-	}
+ /*
+package org.apache.pig.impl.io;
+
+import java.io.BufferedOutputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.pig.data.Tuple;
+
+
+
+public class DataBagFileWriter {
+	File store;
+	DataOutputStream out;
+
+	public DataBagFileWriter(File store) throws IOException{
+		this.store = store;
+		out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(store)));
+	}
+	
+	public void write(Tuple t) throws IOException{
+		t.write(out);
+	}
+	
+	public long write(Iterator<Tuple> iter) throws IOException{
 	
-	public void write(Iterator<Datum> iter) throws IOException{
-		while (iter.hasNext())
+		long initialSize = getFileLength();
+		while (iter.hasNext())
 			iter.next().write(out);
-	}
-	
-	public void close() throws IOException{
-		flush();
-		out.close();
-	}
+		
+		return getFileLength() - initialSize;
+	}
 	
-	public void flush() throws IOException{
+	public long getFileLength() throws IOException{
 		out.flush();
+		return store.length();
 	}
 	
-}
+	
+	public void close() throws IOException{
+		flush();
+		out.close();
+	}
+	
+	public void flush() throws IOException{
+		out.flush();
+	}
+	
+}
+*/

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/io/FileLocalizer.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/io/FileLocalizer.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/io/FileLocalizer.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/io/FileLocalizer.java Tue Jan 22 13:17:12 2008
@@ -32,6 +32,7 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.pig.PigServer.ExecType;
 import org.apache.pig.impl.PigContext;
@@ -158,7 +159,10 @@
        Path paths[] = null;
     	if (fs.exists(path)) {
     		if (fs.isFile(path)) return fs.open(path);
-        	paths = fs.listPaths(path);
+			FileStatus fileStat[] = fs.listStatus(path);
+			paths = new Path[fileStat.length];
+			for (int i = 0; i < fileStat.length; i++)
+        		paths[i] = fileStat[i].getPath();
 		} else {
 			// It might be a glob
 			if (!globMatchesFiles(path, paths, fs)) throw new IOException(path + " does not exist");

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/io/PigFile.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/io/PigFile.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/io/PigFile.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/io/PigFile.java Tue Jan 22 13:17:12 2008
@@ -27,7 +27,6 @@
 import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.data.Datum;
 import org.apache.pig.impl.PigContext;
 
 
@@ -45,8 +44,7 @@
     }
     
     public DataBag load(LoadFunc lfunc, PigContext pigContext) throws IOException {
-        DataBag content =
-			BagFactory.getInstance().getNewBag(Datum.DataType.TUPLE);
+        DataBag content = BagFactory.getInstance().newDefaultBag();
         InputStream is = FileLocalizer.open(file, pigContext);
         lfunc.bindTo(file, new BufferedPositionedInputStream(is), 0, Long.MAX_VALUE);
         Tuple f = null;
@@ -60,8 +58,8 @@
     public void store(DataBag data, StoreFunc sfunc, PigContext pigContext) throws IOException {
         BufferedOutputStream bos = new BufferedOutputStream(FileLocalizer.create(file, append, pigContext));
         sfunc.bindTo(bos);
-        for (Iterator<Datum> it = data.content(); it.hasNext();) {
-            Tuple row = (Tuple)it.next();
+        for (Iterator<Tuple> it = data.iterator(); it.hasNext();) {
+            Tuple row = it.next();
             sfunc.putNext(row);
         }
         sfunc.finish();

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCogroup.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCogroup.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCogroup.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCogroup.java Tue Jan 22 13:17:12 2008
@@ -21,8 +21,8 @@
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.pig.data.Datum;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.eval.EvalSpec;
 import org.apache.pig.impl.logicalLayer.schema.AtomSchema;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
@@ -30,136 +30,153 @@
 
 
 
-public class LOCogroup extends LogicalOperator{
-	private static final long serialVersionUID = 1L;
-	   
-	protected ArrayList<EvalSpec> specs;
-		
-	public LOCogroup(List<LogicalOperator> inputs, ArrayList<EvalSpec> specs) {
-		super(inputs);
+public class LOCogroup extends LogicalOperator {
+    private static final long serialVersionUID = 1L;
+
+    protected ArrayList<EvalSpec> specs;
+
+    public LOCogroup(List<LogicalOperator> inputs,
+                     ArrayList<EvalSpec> specs) {
+        super(inputs);
         this.specs = specs;
         getOutputType();
     }
-	
-	@Override
-	public String name() {
-		return "CoGroup";
-	}
-	@Override
-	public String arguments() {
-	    StringBuffer sb = new StringBuffer();
-        
+
+    @Override
+    public String name() {
+    	if (inputs.size() == 1) return "Group";
+    	else return "CoGroup";
+    }
+    @Override
+    public String arguments() {
+        StringBuffer sb = new StringBuffer();
+
         for (int i = 0; i < specs.size(); i++) {
             sb.append(specs.get(i));
-            if (i+1 < specs.size()) sb.append(", ");
+            if (i + 1 < specs.size())
+                sb.append(", ");
         }
-        
+
         return sb.toString();
-	}
-    
-	public static Datum[] getGroupAndTuple(Datum d){
-		if (!(d instanceof Tuple)){
-			throw new RuntimeException("Internal Error: Evaluation of group expression did not return a tuple");
-		}
-		Tuple output = (Tuple)d;
-		if (output.arity() < 2){
-			throw new RuntimeException("Internal Error: Evaluation of group expression returned a tuple with <2 fields");
-		}
-		
-		Datum[] groupAndTuple = new Datum[2];
-    	if (output.arity() == 2){
-    		groupAndTuple[0] = output.getField(0);
-    		groupAndTuple[1] = output.getField(1);
-    	}else{
-    		Tuple group = new Tuple();
-    		for (int j=0; j<output.arity()-1; j++){
-    			group.appendField(output.getField(j));
-    		}
-    		groupAndTuple[0] = group;
-    		groupAndTuple[1] = output.getField(output.arity()-1);
-    	}
-		return groupAndTuple;
-	}
-	
+    }
+
+    public static Object[] getGroupAndTuple(Object d) {
+        if (!(d instanceof Tuple)) {
+            throw new RuntimeException
+                ("Internal Error: Evaluation of group expression did not return a tuple");
+        }
+        Tuple output = (Tuple) d;
+        if (output.size() < 2) {
+            throw new RuntimeException
+                ("Internal Error: Evaluation of group expression returned a tuple with <2 fields");
+        }
+
+        Object[] groupAndTuple = new Object[2];
+        try {
+            if (output.size() == 2) {
+                groupAndTuple[0] = output.get(0);
+                groupAndTuple[1] = output.get(1);
+            } else {
+                Tuple group = TupleFactory.getInstance().newTuple(output.size());
+                for (int j = 0; j < output.size() - 1; j++) {
+                    group.set(j, output.get(j));
+                }
+                groupAndTuple[0] = group;
+                groupAndTuple[1] = output.get(output.size() - 1);
+            }
+        } catch(IOException e) {
+            throw new RuntimeException(e);
+        }
+        return groupAndTuple;
+    }
+
     @Override
-	public TupleSchema outputSchema() {
-    	if (schema == null){
-	        schema = new TupleSchema();
-	        
-        	
-	        Schema groupElementSchema = specs.get(0).getOutputSchemaForPipe(getInputs().get(0).outputSchema());
-	        if (groupElementSchema == null){
-	        	groupElementSchema = new TupleSchema();
-		        groupElementSchema.setAlias("group");
-	        }else{
-		        
-		        if (!(groupElementSchema instanceof TupleSchema))
-		        	throw new RuntimeException("Internal Error: Schema of group expression was atomic");
-		        List<Schema> fields = ((TupleSchema)groupElementSchema).getFields();
-		        
-		        if (fields.size() < 2)
-		        	throw new RuntimeException("Internal Error: Schema of group expression retured <2 fields");
-
-		        if (fields.size() == 2){
-	        		groupElementSchema = fields.get(0);
-			        groupElementSchema.removeAllAliases();
-			        groupElementSchema.setAlias("group");
-		        }else{
-		        	groupElementSchema = new TupleSchema();
-		            groupElementSchema.setAlias("group");
-		            
-		            for (int i=0; i<fields.size()-1; i++){
-		            	((TupleSchema)groupElementSchema).add(fields.get(i));
-		            }
-		        }
-
-	        }
-	        
-	        schema.add(groupElementSchema);
-	        
-	        for (LogicalOperator lo : getInputs()) {
-	        	TupleSchema inputSchema = lo.outputSchema();
-	        	if (inputSchema == null)
-	        		inputSchema = new TupleSchema();  
-	            schema.add(inputSchema);
-	        }
-    	}
-    	
-    	schema.setAlias(alias);
+    public TupleSchema outputSchema() {
+        if (schema == null) {
+            schema = new TupleSchema();
+
+
+            Schema groupElementSchema =
+                specs.get(0).getOutputSchemaForPipe(getInputs().get(0).
+                                                    outputSchema());
+            if (groupElementSchema == null) {
+                groupElementSchema = new TupleSchema();
+                groupElementSchema.setAlias("group");
+            } else {
+
+                if (!(groupElementSchema instanceof TupleSchema))
+                    throw new RuntimeException
+                        ("Internal Error: Schema of group expression was atomic");
+                List<Schema> fields =
+                    ((TupleSchema) groupElementSchema).getFields();
+
+                if (fields.size() < 2)
+                    throw new RuntimeException
+                        ("Internal Error: Schema of group expression retured <2 fields");
+
+                if (fields.size() == 2) {
+                    groupElementSchema = fields.get(0);
+                    groupElementSchema.removeAllAliases();
+                    groupElementSchema.setAlias("group");
+                } else {
+                    groupElementSchema = new TupleSchema();
+                    groupElementSchema.setAlias("group");
+
+                    for (int i = 0; i < fields.size() - 1; i++) {
+                        ((TupleSchema) groupElementSchema).add(fields.get(i));
+                    }
+                }
+
+            }
+
+            schema.add(groupElementSchema);
+
+          for (LogicalOperator lo:getInputs()) {
+                TupleSchema inputSchema = lo.outputSchema();
+                if (inputSchema == null)
+                    inputSchema = new TupleSchema();
+                schema.add(inputSchema);
+            }
+        }
+
+        schema.setAlias(alias);
         return schema;
     }
-    
+
     @Override
-	public int getOutputType(){
-    	int outputType = FIXED;
-    	for (int i=0; i<getInputs().size(); i++){
-    		switch (getInputs().get(i).getOutputType()){
-    			case FIXED: 
-    				continue;
-    			case MONOTONE: 
-    				outputType = AMENDABLE;
-    				break;
-    			case AMENDABLE:
-    			default:	
-    				throw new RuntimeException("Can't feed a cogroup into another in the streaming case");
-    		} 
-    	}
-    	return outputType;
+    public int getOutputType() {
+        int outputType = FIXED;
+        for (int i = 0; i < getInputs().size(); i++) {
+            switch (getInputs().get(i).getOutputType()) {
+            case FIXED:
+                continue;
+            case MONOTONE:
+                outputType = AMENDABLE;
+                break;
+            case AMENDABLE:
+            default:
+                throw new RuntimeException
+                    ("Can't feed a cogroup into another in the streaming case");
+            }
+        }
+        return outputType;
     }
 
     @Override
-	public List<String> getFuncs() {
+    public List<String> getFuncs() {
         List<String> funcs = super.getFuncs();
-        for (EvalSpec spec: specs) {
+      for (EvalSpec spec:specs) {
             funcs.addAll(spec.getFuncs());
         }
         return funcs;
     }
 
-	public ArrayList<EvalSpec> getSpecs() {
-		return specs;
-	}
+    public ArrayList<EvalSpec> getSpecs() {
+        return specs;
+    }
 
-	
+	public void visit(LOVisitor v) {
+		v.visitCogroup(this);
+	}
 
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOEval.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOEval.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOEval.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOEval.java Tue Jan 22 13:17:12 2008
@@ -24,34 +24,36 @@
 
 
 
-public class LOEval extends LogicalOperator{
-	private static final long serialVersionUID = 1L;
-	   
-	protected EvalSpec spec;
+public class LOEval extends LogicalOperator {
+    private static final long serialVersionUID = 1L;
+
+    protected EvalSpec spec;
 
     public LOEval(LogicalOperator input, EvalSpec specIn) {
-    	super(input);
+        super(input);
         spec = specIn;
         getOutputType();
     }
 
     @Override
-	public String name() {
-        return "Eval";
+    public String name() {
+        return "Foreach";
     }
 
     @Override
-	public String arguments() {
+    public String arguments() {
         return spec.toString();
     }
 
     @Override
-	public TupleSchema outputSchema() {
+    public TupleSchema outputSchema() {
         if (schema == null) {
             //System.out.println("LOEval input: " + inputs[0].outputSchema());
             //System.out.println("LOEval spec: " + spec);
-            schema = (TupleSchema)spec.getOutputSchemaForPipe(getInputs().get(0).outputSchema());
-            
+            schema =
+                (TupleSchema) spec.getOutputSchemaForPipe(getInputs().get(0).
+                                                          outputSchema());
+
             //System.out.println("LOEval output: " + schema);
         }
         schema.setAlias(alias);
@@ -59,7 +61,7 @@
     }
 
     @Override
-	public int getOutputType() {
+    public int getOutputType() {
         switch (getInputs().get(0).getOutputType()) {
         case FIXED:
             return FIXED;
@@ -72,13 +74,17 @@
     }
 
     @Override
-	public List<String> getFuncs() {
+    public List<String> getFuncs() {
         List<String> funcs = super.getFuncs();
         funcs.addAll(spec.getFuncs());
         return funcs;
     }
 
-	public EvalSpec getSpec() {
-		return spec;
+    public EvalSpec getSpec() {
+        return spec;
+    }
+
+	public void visit(LOVisitor v) {
+		v.visitEval(this);
 	}
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOLoad.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOLoad.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOLoad.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOLoad.java Tue Jan 22 13:17:12 2008
@@ -29,70 +29,67 @@
 
 
 public class LOLoad extends LogicalOperator {
-	private static final long serialVersionUID = 1L;
-	   
-	protected FileSpec inputFileSpec;
-
-	protected int  outputType = FIXED;
+    private static final long serialVersionUID = 1L;
 
+    protected FileSpec inputFileSpec;
 
-	public LOLoad(FileSpec inputFileSpec) throws IOException, ParseException{
-		super();
-		this.inputFileSpec = inputFileSpec;               
-	try
-	{
-        	LoadFunc storageFunc = (LoadFunc) PigContext.instantiateFuncFromSpec(inputFileSpec.getFuncSpec());                
-	}
-	catch (IOException e)
-	{
-		Throwable cause = e.getCause();
-		while (cause != null && cause.getClass().getName() != "java.lang.ClassNotFoundException")
-		{
-			System.out.println("cause = " + cause.getClass().getName());
-			cause = cause.getCause();
-		}
-
-		if (cause != null)
-		{
-			throw new ParseException("Load function " + inputFileSpec.getFuncSpec() + " not found");
-		}
-		else
-		{
-			throw e;
-		}
-		
-	}
+    protected int outputType = FIXED;
+
+
+    public LOLoad(FileSpec inputFileSpec) throws IOException, ParseException {
+        super();
+        this.inputFileSpec = inputFileSpec;
+        try {
+            LoadFunc storageFunc =
+                (LoadFunc) PigContext.instantiateFuncFromSpec(inputFileSpec.
+                                                              getFuncSpec());
+        } catch(IOException e) {
+            Throwable cause = e.getCause();
+            while (cause != null
+                   && cause.getClass().getName() !=
+                   "java.lang.ClassNotFoundException") {
+                System.out.println("cause = " + cause.getClass().getName());
+                cause = cause.getCause();
+            } if (cause != null) {
+                throw new ParseException("Load function " +
+                                         inputFileSpec.getFuncSpec() +
+                                         " not found");
+            } else {
+                throw e;
+            }
+
+        }
 
         //TODO: Handle Schemas defined by Load Functions
         schema = new TupleSchema();
     }
 
     @Override
-	public String name() {
+    public String name() {
         return "Load";
     }
-    
-    public FileSpec getInputFileSpec(){
-    	return inputFileSpec;
+
+    public FileSpec getInputFileSpec() {
+        return inputFileSpec;
     }
-    
+
     public void setInputFileSpec(FileSpec spec) {
-    	inputFileSpec = spec;
+        inputFileSpec = spec;
     }
-    
-	@Override
-	public String arguments() {
-    	return inputFileSpec.toString();
+
+    @Override
+    public String arguments() {
+        return inputFileSpec.toString();
     }
 
     @Override
-	public TupleSchema outputSchema() {
-    	schema.setAlias(alias);
+    public TupleSchema outputSchema() {
+        schema.setAlias(alias);
         return this.schema;
     }
 
     @Override
-	public int getOutputType() {
+    public int getOutputType() {
         return outputType;
     }
 
@@ -104,18 +101,22 @@
     }
 
     @Override
-	public String toString() {
-		StringBuffer result = new StringBuffer(super.toString());
-		result.append(" (outputType: ");
-		result.append(outputType);
-		result.append(')');
-		return result.toString();
-	}
+    public String toString() {
+        StringBuffer result = new StringBuffer(super.toString());
+        result.append(" (outputType: ");
+        result.append(outputType);
+        result.append(')');
+        return result.toString();
+    }
 
-	@Override
-	public List<String> getFuncs() {
+    @Override
+    public List<String> getFuncs() {
         List<String> funcs = super.getFuncs();
         funcs.add(inputFileSpec.getFuncName());
         return funcs;
     }
+
+	public void visit(LOVisitor v) {
+		v.visitLoad(this);
+	}
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LORead.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LORead.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LORead.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LORead.java Tue Jan 22 13:17:12 2008
@@ -24,78 +24,80 @@
 
 
 public class LORead extends LogicalOperator {
-	private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 1L;
 
 
-	   
-	protected IntermedResult readFrom = null;
 
-	boolean readsFromSplit = false;
-    
-	@Override
-	public String toString() {
-		StringBuffer result = new StringBuffer(super.toString());
-		result.append(" (readsFromSplit: ");
-		result.append(readsFromSplit);
-		result.append(')');
-		return result.toString();
-	}
+    protected IntermedResult readFrom = null;
 
+    boolean readsFromSplit = false;
 
+     @Override
+     public String toString() {
+        StringBuffer result = new StringBuffer(super.toString());
+          result.append(" (readsFromSplit: ");
+          result.append(readsFromSplit);
+          result.append(')');
+          return result.toString();
+    }
 
-	//Since intermed result may have multiple outputs, which output do I read?
-    
+    //Since intermed result may have multiple outputs, which output do I read?
     public int splitOutputToRead = 0;
-	
-	public LORead(IntermedResult readFromIn) {
-		super();		
+
+    public LORead(IntermedResult readFromIn) {
+        super();
         readFrom = readFromIn;
-	}
-    
-	public LORead(IntermedResult readFromIn, int outputToRead) {
-		super();
-		readsFromSplit = true;
-		this.splitOutputToRead = outputToRead;		
+    }
+
+    public LORead(IntermedResult readFromIn, int outputToRead) {
+        super();
+        readsFromSplit = true;
+        this.splitOutputToRead = outputToRead;
         readFrom = readFromIn;
-	}
-	
-	public boolean readsFromSplit(){
-		return readsFromSplit;
-	}
-	
-	@Override
-	public String name() {
-		return "Read";
-	}
-	@Override
-	public String arguments() {
-		return alias;
-	}
-     
+    }
+
+    public boolean readsFromSplit() {
+        return readsFromSplit;
+    }
+
+    @Override
+    public String name() {
+        return "Read";
+    }
+    @Override
+    public String arguments() {
+        return alias;
+    }
+
     @Override
-	public TupleSchema outputSchema() {
-    	if (schema == null) {
-            if (readFrom.lp != null && readFrom.lp.root != null && readFrom.lp.root.outputSchema() != null) {
+    public TupleSchema outputSchema() {
+        if (schema == null) {
+            if (readFrom.lp != null && readFrom.lp.root != null
+                && readFrom.lp.root.outputSchema() != null) {
                 schema = readFrom.lp.root.outputSchema().copy();
             } else {
                 schema = new TupleSchema();
             }
-    	}
-    	
-    	schema.removeAllAliases();
+        }
+
+        schema.removeAllAliases();
         schema.setAlias(alias);
-        
+
         return schema;
     }
-    
-    
-	
-	@Override
-	public int getOutputType(){
-		return readFrom.getOutputType();
-	}
 
-	public IntermedResult getReadFrom() {
-		return readFrom;
+
+
+    @Override
+    public int getOutputType() {
+        return readFrom.getOutputType();
+    }
+
+    public IntermedResult getReadFrom() {
+        return readFrom;
+    }
+
+	public void visit(LOVisitor v) {
+		v.visitRead(this);
 	}
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSort.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSort.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSort.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSort.java Tue Jan 22 13:17:12 2008
@@ -15,64 +15,67 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.pig.impl.logicalLayer;
-
-
+package org.apache.pig.impl.logicalLayer;
+
+
 import org.apache.pig.impl.eval.EvalSpec;
 import org.apache.pig.impl.logicalLayer.schema.TupleSchema;
 
-
-public class LOSort extends LogicalOperator {
-	private static final long serialVersionUID = 1L;
-	private EvalSpec sortSpec;
-	
-
-	protected EvalSpec spec;
-
-	public EvalSpec getSpec() {
-		return spec;
-	}
-
-
-
-	public LOSort( LogicalOperator input, EvalSpec sortSpec){
-		super(input);		
-		this.sortSpec = sortSpec;
-		getOutputType();
-	}
-	
-	@Override
-	public String name() {
-			return "SORT";
-	}
-	
-	@Override
-	public String arguments() {
-			return sortSpec.toString();
-	}
-		
-	@Override
-	public int getOutputType() {
-		switch(getInputs().get(0).getOutputType()){
-		case FIXED:
-			return FIXED;
-		default:
-			throw new RuntimeException("Blocking operator such as sort cannot handle streaming input");
-		}
-	}
-
-	@Override
-	public TupleSchema outputSchema() {
-		if (schema== null)
-			schema = getInputs().get(0).outputSchema().copy();
-		
-		schema.setAlias(alias);
-		return schema;
-		
-	}
-
-	public EvalSpec getSortSpec() {
-		return sortSpec;
-	}
-
-}
+
+public class LOSort extends LogicalOperator {
+    private static final long serialVersionUID = 1L;
+    private EvalSpec sortSpec;
+
+
+    protected EvalSpec spec;
+
+    public EvalSpec getSpec() {
+        return spec;
+    }
+    
+    public LOSort(LogicalOperator input, EvalSpec sortSpec) {
+        super(input);
+        this.sortSpec = sortSpec;
+        getOutputType();
+    }
+
+    @Override
+    public String name() {
+        return "SORT";
+    }
+
+    @Override
+    public String arguments() {
+        return sortSpec.toString();
+    }
+
+    @Override
+    public int getOutputType() {
+        switch (getInputs().get(0).getOutputType()) {
+        case FIXED:
+            return FIXED;
+        default:
+            throw new RuntimeException
+                ("Blocking operator such as sort cannot handle streaming input");
+        }
+    }
+
+    @Override
+    public TupleSchema outputSchema() {
+        if (schema == null)
+            schema = getInputs().get(0).outputSchema().copy();
+
+        schema.setAlias(alias);
+        return schema;
+
+    }
+
+    public EvalSpec getSortSpec() {
+        return sortSpec;
+    }
+
+	public void visit(LOVisitor v) {
+		v.visitSort(this);
+	}
+
+}

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSplit.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSplit.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSplit.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSplit.java Tue Jan 22 13:17:12 2008
@@ -15,41 +15,44 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.pig.impl.logicalLayer;
-
-import java.util.ArrayList;
-import java.util.List;
+package org.apache.pig.impl.logicalLayer;
+
+import java.util.ArrayList;
+import java.util.List;
 
 import org.apache.pig.impl.eval.cond.Cond;
 import org.apache.pig.impl.logicalLayer.schema.TupleSchema;
-
-
-public class LOSplit extends LogicalOperator {
-	private static final long serialVersionUID = 1L;
-	   
-	List<Cond> conditions = new ArrayList<Cond>();
-	
-	public LOSplit(LogicalOperator input){
-		super(input);	
-	}
-	
-	public void addCond(Cond cond){
-		conditions.add(cond);
-	}
-	
-	@Override
-	public int getOutputType(){
-		return getInputs().get(0).getOutputType();
-	}
-	
-	public ArrayList<Cond> getConditions(){		
-		return new ArrayList<Cond>(conditions);
-	}
-	
-	@Override
-	public TupleSchema outputSchema(){
-		return getInputs().get(0).outputSchema().copy();
-	}
-	
-	
-}
+
+
+public class LOSplit extends LogicalOperator {
+    private static final long serialVersionUID = 1L;
+
+      List<Cond> conditions = new ArrayList<Cond>();
+
+    public LOSplit(LogicalOperator input) {
+        super(input);
+    }
+    
+    public void addCond(Cond cond) {
+        conditions.add(cond);
+    }
+
+    @Override
+    public int getOutputType() {
+        return getInputs().get(0).getOutputType();
+    }
+
+    public ArrayList<Cond> getConditions() {
+        return new ArrayList<Cond> (conditions);
+    }
+
+    @Override
+    public TupleSchema outputSchema() {
+        return getInputs().get(0).outputSchema().copy();
+    }
+
+	public void visit(LOVisitor v) {
+		v.visitSplit(this);
+	}
+
+}

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOStore.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOStore.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOStore.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOStore.java Tue Jan 22 13:17:12 2008
@@ -27,59 +27,62 @@
 
 
 public class LOStore extends LogicalOperator {
-	private static final long serialVersionUID = 1L;
-	   
-	protected FileSpec outputFileSpec;
+    private static final long serialVersionUID = 1L;
 
-	protected boolean append;
-    
+    protected FileSpec outputFileSpec;
 
-	public LOStore(LogicalOperator input, FileSpec fileSpec, boolean append) throws IOException{
-    	super(input);    	
+    protected boolean append;
+
+
+    public LOStore(LogicalOperator input,
+                   FileSpec fileSpec,
+                   boolean append) throws IOException {
+        super(input);
         this.outputFileSpec = fileSpec;
         this.append = append;
 
         //See if the store function spec is valid
-        try{
-        	StoreFunc StoreFunc = (StoreFunc) PigContext.instantiateFuncFromSpec(fileSpec.getFuncSpec());
-        }catch (Exception e){
-        	IOException ioe = new IOException(e.getMessage());
-        	ioe.setStackTrace(e.getStackTrace());
-        	throw ioe;
-        }
-        
-        getOutputType();
+        try {
+            StoreFunc StoreFunc =
+                (StoreFunc) PigContext.instantiateFuncFromSpec(
+                    fileSpec.getFuncSpec());
+        } catch(Exception e) {
+            IOException ioe = new IOException(e.getMessage());
+            ioe.setStackTrace(e.getStackTrace());
+            throw ioe;
+        } getOutputType();
     }
-    
-    
-    public FileSpec getOutputFileSpec(){
-    	return outputFileSpec;
-    }
-
-    
-		@Override
-	public String toString() {
-	
-		StringBuffer result = new StringBuffer(super.toString());
-		result.append(" (append: ");
-		result.append(append);
-		result.append(')');
-		return result.toString();
-	}
 
 
-	@Override
-	public String name() {
+    public FileSpec getOutputFileSpec() {
+        return outputFileSpec;
+    }
+
+
+    @Override
+    public String toString() {
+        StringBuffer result = new StringBuffer(super.toString());
+        result.append(" (append: ");
+        result.append(append);
+        result.append(')');
+        return result.toString();
+    }
+
+
+    @Override
+    public String name() {
         return "Store";
     }
 
     @Override
-	public TupleSchema outputSchema() {
-        throw new RuntimeException("Internal error: Asking for schema of a store operator.");
+    public TupleSchema outputSchema() {
+        throw new
+            RuntimeException
+            ("Internal error: Asking for schema of a store operator.");
     }
 
     @Override
-	public int getOutputType() {
+    public int getOutputType() {
         switch (getInputs().get(0).getOutputType()) {
         case FIXED:
             return FIXED;
@@ -91,14 +94,18 @@
     }
 
     @Override
-	public List<String> getFuncs() {
+    public List<String> getFuncs() {
         List<String> funcs = super.getFuncs();
         funcs.add(outputFileSpec.getFuncName());
         return funcs;
     }
 
 
-	public boolean isAppend() {
-		return append;
+    public boolean isAppend() {
+        return append;
+    }
+
+	public void visit(LOVisitor v) {
+		v.visitStore(this);
 	}
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOUnion.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOUnion.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOUnion.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOUnion.java Tue Jan 22 13:17:12 2008
@@ -24,52 +24,59 @@
 
 
 public class LOUnion extends LogicalOperator {
-	private static final long serialVersionUID = 1L;
-	   
+    private static final long serialVersionUID = 1L;
+
     public LOUnion(List<LogicalOperator> inputsIn) {
-    	super(inputsIn);        
+        super(inputsIn);
     }
-
+    
     @Override
-	public String name() {
+    public String name() {
         return "Union";
     }
 
     @Override
-	public TupleSchema outputSchema() {
-    	if (schema == null){
-    		TupleSchema longest = getInputs().get(0).outputSchema();
-	        int current = 0;
-	        for (LogicalOperator lo : getInputs()) {
-	            if (lo != null && lo.outputSchema() != null && lo.outputSchema().numFields() > current) {
-	                longest = lo.outputSchema();
-	                current = longest.numFields();
-	            }
-	        }
-	        schema = longest.copy();
-    	}
-    	
-    	schema.setAlias(alias);
+    public TupleSchema outputSchema() {
+        if (schema == null) {
+            TupleSchema longest = getInputs().get(0).outputSchema();
+            int current = 0;
+          for (LogicalOperator lo:getInputs()) {
+                if (lo != null && lo.outputSchema() != null
+                        && lo.outputSchema().numFields() > current) {
+                    longest = lo.outputSchema();
+                    current = longest.numFields();
+                }
+            }
+            schema = longest.copy();
+        }
+
+        schema.setAlias(alias);
         return schema;
     }
-	
-	@Override
-	public int getOutputType(){
-	 	int outputType = FIXED;
-    	for (int i=0; i<getInputs().size(); i++){
-    		switch (getInputs().get(i).getOutputType()){
-    			case FIXED: 
-    				continue;
-    			case MONOTONE: 
-    				if (outputType == FIXED)
-    					outputType = MONOTONE;
-    				continue;
-    			case AMENDABLE:
-    				outputType = AMENDABLE;
-    			default:	
-    				throw new RuntimeException("Invalid input type to the UNION operator");
-    		} 
-    	}
-    	return outputType;
+
+    @Override
+    public int getOutputType() {
+        int outputType = FIXED;
+        for (int i = 0; i < getInputs().size(); i++) {
+            switch (getInputs().get(i).getOutputType()) {
+            case FIXED:
+                continue;
+            case MONOTONE:
+                if (outputType == FIXED)
+                    outputType = MONOTONE;
+                continue;
+            case AMENDABLE:
+                outputType = AMENDABLE;
+            default:
+                throw new
+                    RuntimeException
+                    ("Invalid input type to the UNION operator");
+            }
+        }
+        return outputType;
+    }
+
+	public void visit(LOVisitor v) {
+		v.visitUnion(this);
 	}
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java Tue Jan 22 13:17:12 2008
@@ -27,58 +27,55 @@
 
 
 abstract public class LogicalOperator implements Serializable {
-    public String            alias                = null;
-    
-    public static final int  FIXED                = 1;
-    public static final int  MONOTONE             = 2;
-    public static final int  UPDATABLE            = 3;   // Reserved for future use
-    public static final int  AMENDABLE            = 4;
-
-	protected int            requestedParallelism = -1;
-	protected TupleSchema    schema               = null;
-	protected List<LogicalOperator> inputs;
-	
-	protected LogicalOperator(){
-		this.inputs = new ArrayList<LogicalOperator>();
-	}
-	
-	protected LogicalOperator(List<LogicalOperator> inputs) {
-		this.inputs = inputs;
-	}
-
-	protected LogicalOperator(LogicalOperator input) {
-		this.inputs = new ArrayList<LogicalOperator>();
-		inputs.add(input);
-	}
-
-	public String getAlias() {
-		return alias;
-	}
-
-	public void setAlias(String newAlias) {
-		alias = newAlias;
-	}
-
-	public int getRequestedParallelism() {
-		return requestedParallelism;
-	}
-
-	public void setRequestedParallelism(int newRequestedParallelism) {
-		requestedParallelism = newRequestedParallelism;
-	}
-
-	@Override
-	public String toString() {
-		StringBuffer result = new StringBuffer(super.toString());
-		result.append(" (alias: ");
-		result.append(alias);
-		result.append(", requestedParallelism: ");
-		result.append(requestedParallelism);
-		result.append(')');
-		return result.toString();
-	}
+    public String alias = null;
 
-	public abstract TupleSchema outputSchema();
+    public static final int FIXED = 1;
+    public static final int MONOTONE = 2;
+    public static final int UPDATABLE = 3;      // Reserved for future use
+    public static final int AMENDABLE = 4;
+
+    protected int requestedParallelism = -1;
+    protected TupleSchema schema = null;
+    protected List<LogicalOperator> inputs;
+
+    protected LogicalOperator() {
+        this.inputs = new ArrayList<LogicalOperator> ();
+    } protected LogicalOperator(List<LogicalOperator> inputs) {
+        this.inputs = inputs;
+    }
+
+    protected LogicalOperator(LogicalOperator input) {
+        this.inputs = new ArrayList<LogicalOperator> ();
+        inputs.add(input);
+    }
+
+    public String getAlias() {
+        return alias;
+    }
+
+    public void setAlias(String newAlias) {
+        alias = newAlias;
+    }
+
+    public int getRequestedParallelism() {
+        return requestedParallelism;
+    }
+
+    public void setRequestedParallelism(int newRequestedParallelism) {
+        requestedParallelism = newRequestedParallelism;
+    }
+
+    @Override public String toString() {
+        StringBuffer result = new StringBuffer(super.toString());
+        result.append(" (alias: ");
+        result.append(alias);
+        result.append(", requestedParallelism: ");
+        result.append(requestedParallelism);
+        result.append(')');
+        return result.toString();
+    }
+
+    public abstract TupleSchema outputSchema();
 
     public String name() {
         return "ROOT";
@@ -99,10 +96,17 @@
         }
         return funcs;
     }
-    
+
     public abstract int getOutputType();
 
-	public void setSchema(TupleSchema schema) {
-		this.schema = schema;
-	}
+    public void setSchema(TupleSchema schema) {
+        this.schema = schema;
+    }
+
+    /**
+     * Visit all of the logical operators in a tree, starting with this
+     * one.  
+     * @param v LOVisitor to visit this logical plan with.
+     */
+    public abstract void visit(LOVisitor v);
 }

Propchange: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Tue Jan 22 13:17:12 2008
@@ -0,0 +1,8 @@
+
+TokenMgrError.java
+Token.java
+SimpleNode.java
+SimpleCharStream.java
+ParseException.java
+Node.java
+JJTQueryParserState.java

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt Tue Jan 22 13:17:12 2008
@@ -180,7 +180,8 @@
 		if (spec instanceof CompositeEvalSpec)
 			spec = ((CompositeEvalSpec)spec).getSpecs().get(0);
 		if ( spec instanceof ConstSpec || 
-			(spec instanceof FuncEvalSpec && ((FuncEvalSpec)spec).getReturnType() == DataAtom.class))
+			(spec instanceof FuncEvalSpec &&
+                DataType.isAtomic(DataType.findType(((FuncEvalSpec)spec).getReturnType()))))
 			isAtomic = true;
 		else if (spec instanceof FuncEvalSpec)
 			isAtomic = false;
@@ -515,7 +516,7 @@
 }
 
 
-LogicalOperator OrderClause() : {LogicalOperator op; EvalSpec sortSpec = null; ProjectSpec projSpec;}
+LogicalOperator OrderClause() : {LogicalOperator op; EvalSpec sortSpec = null; ProjectSpec projSpec; String funcName;}
 {
 	(
 	op = NestedExpr() <BY> 
@@ -530,6 +531,17 @@
 		)
 	|	(sortSpec = Star() {sortSpec = new GenerateSpec(sortSpec);})
 	)
+    (
+        <USING>  funcName = QualifiedFunction()
+        {
+            try {
+                sortSpec.setComparatorName(funcName);
+            } catch (Exception e){
+                throw new ParseException(e.getMessage());
+            }
+        }
+    )?
+
 	)
 	{
 		return new LOSort(op, sortSpec);
@@ -607,13 +619,23 @@
 }
 
 EvalSpec NestedSortOrArrange(Schema over, Map<String, EvalSpec> specs):
-{EvalSpec sortSpec; ProjectSpec projSpec; EvalSpec item; Schema subSchema = null; Token t;}
+{EvalSpec sortSpec; ProjectSpec projSpec; EvalSpec item; Schema subSchema = null; Token t; String funcName;}
 {
 	(
 	( t = <ORDER> | t = <ARRANGE> )
 	item = BaseEvalSpec(over,specs) { subSchema = item.getOutputSchemaForPipe(over); }
-	<BY> ( (projSpec = SimpleProj(subSchema) {sortSpec = projSpec;}) 
-		| sortSpec = Star() )
+	<BY> ( (projSpec = SimpleProj(subSchema) {sortSpec = projSpec;})
+		| sortSpec = Star() )     
+    (
+        <USING>  funcName = QualifiedFunction()
+        {
+            try {
+                sortSpec.setComparatorName(funcName);
+            } catch (Exception e){
+                throw new ParseException(e.getMessage());
+            }
+        }
+    )?
 	)
 	{ return copyItemAndAddSpec(item,new SortDistinctSpec(false, sortSpec)); }
 }
@@ -932,6 +954,7 @@
 		return funcName;
 	}	
 }
+
 
 /**
  * Bug 831620 - '$' support

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/mapreduceExec/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapreduceExec/MapReduceLauncher.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/mapreduceExec/MapReduceLauncher.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/mapreduceExec/MapReduceLauncher.java Tue Jan 22 13:17:12 2008
@@ -17,8 +17,6 @@
  */
 package org.apache.pig.impl.mapreduceExec;
 
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
@@ -29,19 +27,21 @@
 import org.apache.log4j.Logger;
 import org.apache.pig.builtin.PigStorage;
 import org.apache.pig.data.DataBag;
+import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.IndexedTuple;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.data.Datum;
-import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.eval.EvalSpec;
 import org.apache.pig.impl.io.PigFile;
 import org.apache.pig.impl.physicalLayer.POMapreduce;
 import org.apache.pig.impl.util.JarManager;
+import org.apache.pig.impl.util.PigLogger;
 import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.impl.util.PigLogger;
 
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.UTF8;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.io.WritableComparator;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.TaskReport;
 import org.apache.hadoop.mapred.JobClient;
@@ -65,7 +65,17 @@
         numMRJobs = numMRJobsIn;
         mrJobNumber = 0;
     }
-        
+
+    public static class PigWritableComparator extends WritableComparator {
+        public PigWritableComparator() {
+            super(Tuple.class);
+        }
+
+        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2){
+            return WritableComparator.compareBytes(b1, s1, l1, b2, s2, l2);
+	    }
+    }
+
     static Random rand = new Random();
 
     /**
@@ -91,7 +101,7 @@
      * @throws IOException
      */
     public boolean launchPig(POMapreduce pom) throws IOException {
-		Logger log = PigLogger.getLogger();
+        Logger log = PigLogger.getLogger();
         JobConf conf = new JobConf(pom.pigContext.getConf());
         conf.setJobName(pom.pigContext.getJobName());
         boolean success = false;
@@ -112,7 +122,7 @@
 	{
 		FileOutputStream fos = new FileOutputStream(submitJarFile);
                	JarManager.createJar(fos, funcs, pom.pigContext);
-               	System.out.println("Job jar size = " + submitJarFile.length());
+               	log.debug("Job jar size = " + submitJarFile.length());
 		conf.setJar(submitJarFile.getPath());
         	String user = System.getProperty("user.name");
         	conf.setUser(user != null ? user : "Pigster");
@@ -133,20 +143,28 @@
             		conf.set("pig.pigContext", ObjectSerializer.serialize(pom.pigContext));
             
             	conf.setMapRunnerClass(PigMapReduce.class);
-            	if (pom.toCombine != null)
-                	conf.setCombinerClass(PigCombine.class);
+                if (pom.toCombine != null) {
+                    conf.setCombinerClass(PigCombine.class);
+                    //conf.setCombinerClass(PigMapReduce.class);
+                }
             	if (pom.quantilesFile!=null){
             		conf.set("pig.quantilesFile", pom.quantilesFile);
-            	}
+                }
+                else{
+                    // this is not a sort job - can use byte comparison to speed up processing
+                    conf.setOutputKeyComparatorClass(PigWritableComparator.class);					
+                }
             	if (pom.partitionFunction!=null){
             		conf.setPartitionerClass(SortPartitioner.class);
             	}
             	conf.setReducerClass(PigMapReduce.class);
             	conf.setInputFormat(PigInputFormat.class);
             	conf.setOutputFormat(PigOutputFormat.class);
-            	conf.setInputKeyClass(UTF8.class);
-            	conf.setInputValueClass(Tuple.class);
+            	// not used starting with 0.15 conf.setInputKeyClass(Text.class);
+            	// not used starting with 0.15 conf.setInputValueClass(Tuple.class);
             	conf.setOutputKeyClass(Tuple.class);
+            	if (pom.userComparator != null)
+                	conf.setOutputKeyComparatorClass(pom.userComparator);
             	conf.setOutputValueClass(IndexedTuple.class);
             	conf.set("pig.inputs", ObjectSerializer.serialize(pom.inputFileSpecs));
             
@@ -212,8 +230,8 @@
             	
             		// create an empty output file
                 	PigFile f = new PigFile(outputFile.toString(), false);
-                	f.store(new DataBag(Datum.DataType.TUPLE),
-						new PigStorage(), pom.pigContext);
+                	f.store(BagFactory.getInstance().newDefaultBag(),
+                        new PigStorage(), pom.pigContext);
                 
             	}
 

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/mapreduceExec/PigCombine.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapreduceExec/PigCombine.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/mapreduceExec/PigCombine.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/mapreduceExec/PigCombine.java Tue Jan 22 13:17:12 2008
@@ -28,10 +28,10 @@
 import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.pig.data.BagFactory;
-import org.apache.pig.data.BigDataBag;
-import org.apache.pig.data.Datum;
+import org.apache.pig.data.DataBag;
 import org.apache.pig.data.IndexedTuple;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.eval.EvalSpec;
 import org.apache.pig.impl.eval.collector.DataCollector;
@@ -47,17 +47,20 @@
     private OutputCollector oc;
     private int             index;
     private int             inputCount;
-    private BigDataBag      bags[];
-    private File            tmpdir;
+    private DataBag         bags[];
+    private TupleFactory    mTupleFactory = TupleFactory.getInstance();
+    // private File            tmpdir;
 
     public void reduce(WritableComparable key, Iterator values, OutputCollector output, Reporter reporter)
             throws IOException {
 
         try {
+            /*
             tmpdir = new File(job.get("mapred.task.id"));
             tmpdir.mkdirs();
 
             BagFactory.init(tmpdir);
+            */
             PigContext pigContext = (PigContext) ObjectSerializer.deserialize(job.get("pig.pigContext"));
             if (evalPipe == null) {
                 inputCount = ((ArrayList<FileSpec>)ObjectSerializer.deserialize(job.get("pig.inputs"))).size();
@@ -69,37 +72,37 @@
                 evalPipe = esp.setupPipe(finalout);
                 //throw new RuntimeException("combine spec: " + evalSpec + " combine pipe: " + esp.toString());
                 
-                bags = new BigDataBag[inputCount];
+                bags = new DataBag[inputCount];
                 for (int i = 0; i < inputCount; i++) {
-                    bags[i] = BagFactory.getInstance().getNewBigBag(Datum.DataType.TUPLE);
+                    bags[i] = BagFactory.getInstance().newDefaultBag();
                 }
             }
 
             PigSplit split = PigInputFormat.PigRecordReader.getPigRecordReader().getPigFileSplit();
             index = split.getIndex();
 
-            Datum groupName = ((Tuple) key).getField(0);
+            String groupName = (String)((Tuple) key).get(0);
             finalout.group = ((Tuple) key);
             finalout.index = index;
             
-            Tuple t = new Tuple(1 + inputCount);
-            t.setField(0, groupName);
+            Tuple t = mTupleFactory.newTuple(1 + inputCount);
+            t.set(0, groupName);
             for (int i = 1; i < 1 + inputCount; i++) {
                 bags[i - 1].clear();
-                t.setField(i, bags[i - 1]);
+                t.set(i, bags[i - 1]);
             }
 
             while (values.hasNext()) {
                 IndexedTuple it = (IndexedTuple) values.next();
-                t.getBagField(it.index + 1).add(it);
+                ((DataBag)t.get(it.index + 1)).add(it.toTuple());
             }
             for (int i = 0; i < inputCount; i++) {  // XXX: shouldn't we only do this if INNER flag is set?
-                if (t.getBagField(1 + i).isEmpty())
+                if (((DataBag)t.get(1 + i)).size() == 0)
                     return;
             }
 //          throw new RuntimeException("combine input: " + t.toString());
             evalPipe.add(t);
-            evalPipe.add(null); // EOF marker
+            // evalPipe.add(null); // EOF marker
         } catch (Throwable tr) {
             tr.printStackTrace();
             RuntimeException exp = new RuntimeException(tr.getMessage());
@@ -132,10 +135,11 @@
         }
 
         @Override
-		public void add(Datum d){
+		public void add(Object d){
             if (d == null) return;  // EOF marker from eval pipeline; ignore
             try{
-            	oc.collect(group, new IndexedTuple(((Tuple)d).getTupleField(0),index));
+            	// oc.collect(group, new IndexedTuple(((Tuple)d).getTupleField(0),index));
+                oc.collect(group, new IndexedTuple(((Tuple)d),index));
             }catch (IOException e){
             	throw new RuntimeException(e);
             }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/mapreduceExec/PigInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapreduceExec/PigInputFormat.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/mapreduceExec/PigInputFormat.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/mapreduceExec/PigInputFormat.java Tue Jan 22 13:17:12 2008
@@ -24,7 +24,8 @@
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.UTF8;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.compress.CompressionCodec;
@@ -37,6 +38,7 @@
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.pig.LoadFunc;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.eval.EvalSpec;
 import org.apache.pig.impl.io.BufferedPositionedInputStream;
@@ -45,7 +47,7 @@
 import org.apache.tools.bzip2r.CBZip2InputStream;
 
 
-public class PigInputFormat implements InputFormat, JobConfigurable {
+public class PigInputFormat implements InputFormat<Text, Tuple>, JobConfigurable {
 
     public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
     	
@@ -87,15 +89,15 @@
             //paths.add(path);
             for (int j = 0; j < paths.size(); j++) {
                 Path fullPath = new Path(fs.getWorkingDirectory(), paths.get(j));
-                if (fs.isDirectory(fullPath)) {
-                	Path children[] = fs.listPaths(fullPath);
+                if (fs.getFileStatus(fullPath).isDir()) {
+                	FileStatus children[] = fs.listStatus(fullPath);
                 	for(int k = 0; k < children.length; k++) {
-                		paths.add(children[k]);
+                		paths.add(children[k].getPath());
                 	}
                 	continue;
                 }
-                long bs = fs.getBlockSize(fullPath);
-                long size = fs.getLength(fullPath);
+                long bs = fs.getFileStatus(fullPath).getBlockSize();
+                long size = fs.getFileStatus(fullPath).getLen();
                 long pos = 0;
                 String name = paths.get(j).getName();
                 if (name.endsWith(".gz")) {
@@ -114,7 +116,7 @@
         return splits.toArray(new PigSplit[splits.size()]);
     }
 
-   public RecordReader getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException {
+   public RecordReader<Text, Tuple> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException {
         PigRecordReader r = new PigRecordReader(job, (PigSplit)split, compressionCodecs);
         return r;
     }
@@ -127,7 +129,7 @@
         codecList = conf.get("io.compression.codecs", "none");
     }
     
-    public static class PigRecordReader implements RecordReader {
+    public static class PigRecordReader implements RecordReader<Text, Tuple> {
         /**
          * This is a tremendously ugly hack to get around the fact that mappers do not have access
          * to their readers. We take advantage of the fact that RecordReader.next and Mapper.map is
@@ -146,6 +148,7 @@
         LoadFunc                loader;
         CompressionCodecFactory compressionFactory;
         JobConf job;
+        TupleFactory mTupleFactory = TupleFactory.getInstance();
         
         PigRecordReader(JobConf job, PigSplit split, CompressionCodecFactory compressionFactory) throws IOException {
             this.split = split;
@@ -182,15 +185,15 @@
         public JobConf getJobConf(){
         	return job;
         }
-        
-        public boolean next(Writable key, Writable value) throws IOException {
+
+        public boolean next(Text key, Tuple value) throws IOException {
             Tuple t = loader.getNext();
             if (t == null) {
                 return false;
             }
 
-            ((UTF8) key).set(split.getPath().getName());
-            ((Tuple)value).copyFrom(t);
+            key.set(split.getPath().getName());
+            value.reference(t);
             return true;
         }
 
@@ -206,19 +209,19 @@
             return split;
         }
 
-        public WritableComparable createKey() {
-            return new UTF8();
+        public Text createKey() {
+            return new Text();
         }
 
-        public Writable createValue() {
-            return new Tuple();
+        public Tuple createValue() {
+            return mTupleFactory.newTuple();
         }
 
-	public float getProgress() throws IOException {
-	    float progress = getPos() - split.getStart();
-	    float finish = split.getLength();
-	    return progress/finish;
-	}
+    	public float getProgress() throws IOException {
+    	    float progress = getPos() - split.getStart();
+    	    float finish = split.getLength();
+    	    return progress/finish;
+    	}
     }
 
     public void validateInput(JobConf arg0) throws IOException {

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/mapreduceExec/PigMapReduce.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapreduceExec/PigMapReduce.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/mapreduceExec/PigMapReduce.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/mapreduceExec/PigMapReduce.java Tue Jan 22 13:17:12 2008
@@ -36,9 +36,9 @@
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
-import org.apache.pig.data.Datum;
 import org.apache.pig.data.IndexedTuple;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.eval.EvalSpec;
 import org.apache.pig.impl.eval.StarSpec;
@@ -87,9 +87,10 @@
     private int                       index;
     private int                       inputCount;
     private boolean                   isInner[];
-    private File                      tmpdir;
+    // private File                      tmpdir;
     private static PigContext pigContext = null;
     ArrayList<PigRecordWriter> sideFileWriters = new ArrayList<PigRecordWriter>();
+    TupleFactory                      mTupleFactory = TupleFactory.getInstance();
 
     /**
      * This function is called in MapTask by Hadoop as the Mapper.run() method. We basically pull
@@ -100,9 +101,11 @@
         PigMapReduce.reporter = reporter;
 
         oc = output;
+        /*
         tmpdir = new File(job.get("mapred.task.id"));
         tmpdir.mkdirs();
         BagFactory.init(tmpdir);
+        */
 
         setupMapPipe(reporter);
 
@@ -125,10 +128,12 @@
         PigMapReduce.reporter = reporter;
         
         try {
+            /*
             tmpdir = new File(job.get("mapred.task.id"));
             tmpdir.mkdirs();
 
             BagFactory.init(tmpdir);
+            */
 
             oc = output;
             if (evalPipe == null) {
@@ -136,22 +141,21 @@
             }
 
             DataBag[] bags = new DataBag[inputCount];
-            Datum groupName = ((Tuple) key).getField(0);
-            Tuple t = new Tuple(1 + inputCount);
-            t.setField(0, groupName);
+            String groupName = (String)((Tuple) key).get(0);
+            Tuple t = mTupleFactory.newTuple(1 + inputCount);
+            t.set(0, groupName);
             for (int i = 1; i < 1 + inputCount; i++) {
-                bags[i - 1] =
-					BagFactory.getInstance().getNewBag(Datum.DataType.TUPLE);
-                t.setField(i, bags[i - 1]);
+                bags[i - 1] = BagFactory.getInstance().newDefaultBag();
+                t.set(i, bags[i - 1]);
             }
 
             while (values.hasNext()) {
                 IndexedTuple it = (IndexedTuple) values.next();
-                t.getBagField(it.index + 1).add(it.toTuple());
+                ((DataBag)t.get(it.index + 1)).add(it.toTuple());
             }
             
             for (int i = 0; i < inputCount; i++) {
-                if (isInner[i] && t.getBagField(1 + i).isEmpty())
+                if (isInner[i] && ((DataBag)t.get(1 + i)).size() == 0)
                     return;
             }
             
@@ -300,14 +304,15 @@
     	}
     	
         @Override
-		public void add(Datum d){
+		public void add(Object d){
         	try{
 	            if (group == null) {
 	                oc.collect(null, (Tuple)d);
 	            } else {
-	            	Datum[] groupAndTuple = LOCogroup.getGroupAndTuple(d);
+	            	Object[] groupAndTuple = LOCogroup.getGroupAndTuple(d);
                     // wrap group label in a tuple, so it becomes writable.
-	            	oc.collect(new Tuple(groupAndTuple[0]), new IndexedTuple((Tuple)groupAndTuple[1], index));
+	            	oc.collect(mTupleFactory.newTuple(groupAndTuple[0]),
+                        new IndexedTuple((Tuple)groupAndTuple[1], index));
                 }
             }catch(IOException e){
             	throw new RuntimeException(e);
@@ -329,7 +334,7 @@
     	}
     	
         @Override
-		public void add(Datum d){    
+		public void add(Object d){    
         	try{
         		//System.out.println("Adding " + d + " to reduce output");
         		oc.collect(null, (Tuple)d);

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/mapreduceExec/PigSplit.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapreduceExec/PigSplit.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/mapreduceExec/PigSplit.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/mapreduceExec/PigSplit.java Tue Jan 22 13:17:12 2008
@@ -161,7 +161,9 @@
     	try{
     		return ois.readObject();
     	}catch (ClassNotFoundException cnfe){
-    		throw new IOException(cnfe);
+    		IOException newE = new IOException(cnfe.getMessage());
+                newE.initCause(cnfe);
+                throw newE;
     	}
     }
     

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/mapreduceExec/SortPartitioner.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapreduceExec/SortPartitioner.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/mapreduceExec/SortPartitioner.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/mapreduceExec/SortPartitioner.java Tue Jan 22 13:17:12 2008
@@ -15,62 +15,66 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.pig.impl.mapreduceExec;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Partitioner;
+package org.apache.pig.impl.mapreduceExec;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Partitioner;
 import org.apache.pig.builtin.BinStorage;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.io.BufferedPositionedInputStream;
 import org.apache.pig.impl.io.FileLocalizer;
-
-
-public class SortPartitioner implements Partitioner {
-	Tuple[] quantiles;
-	
-	public int getPartition(WritableComparable key, Writable value,
-			int numPartitions) {
-		try{
-			Tuple keyTuple = (Tuple)key;
-			int index = Arrays.binarySearch(quantiles, keyTuple.getTupleField(0));
-			if (index < 0)
-				index = -index-1;
-			return Math.min(index, numPartitions - 1);
-		}catch(IOException e){
-			throw new RuntimeException(e);
-		}
-	}
-
-	public void configure(JobConf job) {
-		String quantilesFile = job.get("pig.quantilesFile", "");
-		if (quantilesFile.length() == 0)
-			throw new RuntimeException("Sort paritioner used but no quantiles found");
-		
-		try{
-			InputStream is = FileLocalizer.openDFSFile(quantilesFile,job);
-			BinStorage loader = new BinStorage();
-			loader.bindTo(quantilesFile, new BufferedPositionedInputStream(is), 0, Long.MAX_VALUE);
-			
-			Tuple t;
-			ArrayList<Tuple> quantiles = new ArrayList<Tuple>();
-			
-			while(true){
-				t = loader.getNext();
-				if (t==null)
-					break;
-				quantiles.add(t);
-			}
-			this.quantiles = quantiles.toArray(new Tuple[0]);
-		}catch (IOException e){
-			throw new RuntimeException(e);
-		}
-	}
-
-}
+
+
+public class SortPartitioner implements Partitioner {
+    Tuple[] quantiles;
+    WritableComparator comparator;
+    
+    public int getPartition(WritableComparable key, Writable value,
+            int numPartitions) {
+        try{
+            Tuple keyTuple = (Tuple)key;
+            int index = Arrays.binarySearch(quantiles, (Tuple)keyTuple.get(0), comparator);
+            if (index < 0)
+                index = -index-1;
+            return Math.min(index, numPartitions - 1);
+        }catch(IOException e){
+            throw new RuntimeException(e);
+        }
+    }
+
+    public void configure(JobConf job) {
+        String quantilesFile = job.get("pig.quantilesFile", "");
+        if (quantilesFile.length() == 0)
+            throw new RuntimeException("Sort paritioner used but no quantiles found");
+        
+        try{
+            InputStream is = FileLocalizer.openDFSFile(quantilesFile,job);
+            BinStorage loader = new BinStorage();
+            loader.bindTo(quantilesFile, new BufferedPositionedInputStream(is), 0, Long.MAX_VALUE);
+            
+            Tuple t;
+            ArrayList<Tuple> quantiles = new ArrayList<Tuple>();
+            
+            while(true){
+                t = loader.getNext();
+                if (t==null)
+                    break;
+                quantiles.add(t);
+            }
+            this.quantiles = quantiles.toArray(new Tuple[0]);
+        }catch (IOException e){
+            throw new RuntimeException(e);
+        }
+
+        comparator = job.getOutputKeyComparator();
+    }
+
+}

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/IntermedResult.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/IntermedResult.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/IntermedResult.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/IntermedResult.java Tue Jan 22 13:17:12 2008
@@ -24,8 +24,8 @@
 import org.apache.pig.PigServer.ExecType;
 import org.apache.pig.builtin.BinStorage;
 import org.apache.pig.data.DataBag;
+import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.data.Datum;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.io.PigFile;
@@ -59,7 +59,7 @@
     
     public IntermedResult() {
         executed = true;
-        databag = new DataBag(Datum.DataType.TUPLE);
+        databag = BagFactory.getInstance().newDefaultBag();
     }
     
     public IntermedResult(DataBag bag) {

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/MapreducePlanCompiler.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/MapreducePlanCompiler.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/MapreducePlanCompiler.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/MapreducePlanCompiler.java Tue Jan 22 13:17:12 2008
@@ -19,19 +19,29 @@
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Comparator;
 import java.util.Map;
+import java.util.Iterator;
 
+import org.apache.hadoop.io.WritableComparator;
 import org.apache.pig.builtin.BinStorage;
+import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.FunctionInstantiator;
 import org.apache.pig.impl.builtin.FindQuantiles;
 import org.apache.pig.impl.builtin.RandomSampleLoader;
+import org.apache.pig.impl.eval.BinCondSpec;
 import org.apache.pig.impl.eval.ConstSpec;
 import org.apache.pig.impl.eval.EvalSpec;
+import org.apache.pig.impl.eval.FilterSpec;
 import org.apache.pig.impl.eval.FuncEvalSpec;
 import org.apache.pig.impl.eval.GenerateSpec;
 import org.apache.pig.impl.eval.ProjectSpec;
+import org.apache.pig.impl.eval.CompositeEvalSpec;
+import org.apache.pig.impl.eval.MapLookupSpec;
 import org.apache.pig.impl.eval.SortDistinctSpec;
 import org.apache.pig.impl.eval.StarSpec;
+import org.apache.pig.impl.eval.EvalSpecVisitor;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.logicalLayer.LOCogroup;
@@ -194,9 +204,35 @@
 
         } else { // push into "reduce" phase
             
-            // use combiner, if amenable
-            if (mro.toReduce == null && lo.getSpec().amenableToCombiner()) {
-            	//TODO
+            EvalSpec spec = lo.getSpec();
+
+            if (mro.toReduce == null && shouldCombine(spec)) {
+                // Push this spec into the combiner.  But we also need to
+                // create a new spec with a changed expected projection to
+                // push into the reducer.
+
+                if (mro.toCombine != null) {
+                    throw new AssertionError("Combiner already set.");
+                }
+                // mro.toCombine = spec;
+
+                // Now, we need to adjust the expected projection for the
+                // eval spec(s) we just pushed.  Also, this will change the
+                // function to be the final instead of general instance.
+                EvalSpec newSpec = spec.copy(pigContext);
+                newSpec.visit(new ReduceAdjuster(pigContext));
+                mro.addReduceSpec(newSpec);
+
+                // Adjust the function name for the combine spec, to set it
+                // to the initial function instead of the general
+                // instance.  Make a copy of the eval spec rather than
+                // adjusting the existing one, to prevent breaking the 
+                // logical plan in case another physical plan is generated
+                // from it later.
+                EvalSpec combineSpec = spec.copy(pigContext);
+                combineSpec.visit(new CombineAdjuster());
+                mro.toCombine = combineSpec;
+
             } else {
                 mro.addReduceSpec(lo.getSpec()); // otherwise, don't use combiner
             }
@@ -271,7 +307,216 @@
 		sortJob.addReduceSpec(new GenerateSpec(ps));
 	
     	sortJob.reduceParallelism = loSort.getRequestedParallelism();
+    	
+    	String comparatorFuncName = loSort.getSortSpec().getComparatorName();
+    	if (comparatorFuncName != null) {
+    		try {
+    			sortJob.userComparator = 
+    				(Class<WritableComparator>)Class.forName(comparatorFuncName);
+    		} catch (ClassNotFoundException e) {
+    			throw new RuntimeException("Unable to find user comparator " + comparatorFuncName, e);
+    		}
+    	}
+
     	return sortJob;
     }
-    
+
+    private boolean shouldCombine(EvalSpec spec) {
+        // Determine whether this something we can combine or not.
+        // First, it must be a generate spec.
+        if (!(spec instanceof GenerateSpec)) {
+            return false;
+        }
+
+        GenerateSpec gen = (GenerateSpec)spec;
+
+        // Second, the first immediate child of the generate spec must be
+        // a project with a value of 0.
+        Iterator<EvalSpec> i = gen.getSpecs().iterator();
+        if (!i.hasNext()) return false;
+        EvalSpec s = i.next();
+        if (!(s instanceof ProjectSpec)) {
+            return false;
+        } else {
+            ProjectSpec p = (ProjectSpec)s;
+            if (p.numCols() > 1) return false;
+            else if (p.getCol() != 0) return false;
+        }
+
+        // Third, all subsequent immediate children of the generate spec
+        // must be func eval specs
+        while (i.hasNext()) {
+            s = i.next();
+            if (!(s instanceof FuncEvalSpec)) return false;
+        }
+
+        // Third, walk the entire tree of the generate spec and see if we
+        // can combine it.
+        CombineDeterminer cd = new CombineDeterminer();
+        gen.visit(cd);
+        return cd.useCombiner();
+    }
+
+    private class ReduceAdjuster extends EvalSpecVisitor {
+        private int position = 0;
+        FunctionInstantiator instantiator = null;
+
+        public ReduceAdjuster(FunctionInstantiator fi) {
+            instantiator = fi;
+        }
+
+        public void visitGenerate(GenerateSpec g) {
+            Iterator<EvalSpec> i = g.getSpecs().iterator();
+            for (position = 0; i.hasNext(); position++) {
+                i.next().visit(this);
+            }
+        }
+        
+        public void visitFuncEval(FuncEvalSpec fe) {
+            // Need to replace our arg spec with a project of our position.
+            // DON'T visit our args, they're exactly what we're trying to
+            // lop off.
+            // The first ProjectSpec in the Composite is because the tuples
+            // will come out of the combiner in the form (groupkey,
+            // {(x, y, z)}).  The second ProjectSpec contains the offset of
+            // the projection element we're interested in.
+            CompositeEvalSpec cs = new CompositeEvalSpec(new ProjectSpec(1));
+            cs.addSpec(new ProjectSpec(position));
+            fe.setArgs(new GenerateSpec(cs));
+
+
+            // Reset the function to call the final instance of itself
+            // instead of the general instance.  Have to instantiate the
+            // function itself first so we can find out if it's algebraic
+            // or not.
+            try {
+                fe.instantiateFunc(instantiator);
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+            fe.resetFuncToFinal();
+        }
+    }
+
+    private class CombineAdjuster extends EvalSpecVisitor {
+        private int position = 0;
+        
+        //We don't want to be performing any flattening in the combiner since the column numbers in
+        //the reduce spec assume that there is no combiner. If the combiner performs flattening, the column
+        //numbers get messed up. For now, since combiner works only with generate group, func1(), func2(),...,
+        //it suffices to write visitors for those eval spec types.
+
+        public void visitFuncEval(FuncEvalSpec fe) {
+            // Reset the function to call the initial instance of itself
+            // instead of the general instance.
+            fe.resetFuncToInitial();
+            fe.setFlatten(false);
+        }
+        
+
+        @Override
+        public void visitProject(ProjectSpec p) {
+            p.setFlatten(false);
+        }
+        
+        
+    }
+
+    private class CombineDeterminer extends EvalSpecVisitor {
+        private class FuncCombinable extends EvalSpecVisitor {
+            public boolean combinable = true;
+
+            @Override
+            public void visitBinCond(BinCondSpec bc) {
+                combinable = false;
+            }
+            
+            @Override
+            public void visitFilter(FilterSpec bc) {
+                combinable = false;
+            }
+
+            @Override
+            public void visitFuncEval(FuncEvalSpec bc) {
+                combinable = false;
+            }
+
+            @Override
+            public void visitSortDistinct(SortDistinctSpec bc) {
+                combinable = false;
+            }
+        };
+
+        private int shouldCombine = 0;
+
+        public boolean useCombiner() {
+            return shouldCombine > 0;
+        }
+
+        @Override
+        public void visitBinCond(BinCondSpec bc) {
+            // TODO Could be true if both are true.  But the logic in
+            // CombineAdjuster and ReduceAdjuster don't know how to handle
+            // binconds, so just do false for now.
+            shouldCombine = -1;
+        }
+
+        @Override
+        public void visitCompositeEval(CompositeEvalSpec ce) {
+            // If we've already determined we're not combinable, stop.
+            if (shouldCombine < 0) return;
+
+            for (EvalSpec spec: ce.getSpecs()) {
+                spec.visit(this);
+            }
+        }
+
+        // ConstSpec is a NOP, as it neither will benefit from nor
+        // prevents combinability.
+        
+        @Override
+        public void visitFilter(FilterSpec f) {
+            shouldCombine = -1;
+        }
+
+        @Override
+        public void visitFuncEval(FuncEvalSpec fe) {
+            // Check the functions arguments, to make sure they are
+            // combinable.
+            FuncCombinable fc = new FuncCombinable();
+            fe.getArgs().visit(fc);
+            if (!fc.combinable) {
+                shouldCombine = -1;
+                return;
+            }
+            
+            if (fe.combinable()) shouldCombine = 1;
+            else shouldCombine = -1;
+        }
+
+        @Override
+        public void visitGenerate(GenerateSpec g) {
+            // If we've already determined we're not combinable, stop.
+            if (shouldCombine < 0) return;
+
+            for (EvalSpec spec: g.getSpecs()) {
+                spec.visit(this);
+            }
+        }
+
+        // MapLookupSpec is a NOP, as it neither will benefit from nor
+        // prevents combinability.
+        
+        // ProjectSpec is a NOP, as it neither will benefit from nor
+        // prevents combinability.
+        
+        @Override
+        public void visitSortDistinct(SortDistinctSpec sd) {
+            shouldCombine = -1;
+        }
+
+        // StarSpec is a NOP, as it neither will benefit from nor
+        // prevents combinability.
+    }
+
 }