You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2008/05/16 03:09:32 UTC

svn commit: r656913 [2/3] - in /incubator/pig/branches/types: ./ src/org/apache/pig/backend/hadoop/executionengine/ src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/ src/org/apache/pig/impl/builtin/ src/org/apache/pig/impl/mapReduceLayer...

Added: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/POUserFunc.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/POUserFunc.java?rev=656913&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/POUserFunc.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/POUserFunc.java Thu May 15 18:09:30 2008
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators;
+
+import java.io.IOException;
+import java.lang.reflect.Type;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.Algebraic;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.logicalLayer.OperatorKey;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.physicalLayer.POStatus;
+import org.apache.pig.impl.physicalLayer.Result;
+import org.apache.pig.impl.physicalLayer.plans.ExprPlanVisitor;
+import org.apache.pig.impl.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.impl.plan.VisitorException;
+
+public class POUserFunc extends ExpressionOperator {
+
+	transient EvalFunc func;
+	Tuple t1, t2;
+	private final Log log = LogFactory.getLog(getClass());
+	String funcSpec;
+	private final byte INITIAL = 0;
+	private final byte INTERMEDIATE = 1;
+	private final byte FINAL = 2;
+
+	public POUserFunc(OperatorKey k, int rp, List inp) {
+		super(k, rp);
+		inputs = inp;
+
+	}
+
+	public POUserFunc(OperatorKey k, int rp, List inp, String funcSpec) {
+		this(k, rp, inp, funcSpec, null);
+
+		instantiateFunc();
+	}
+	
+	public POUserFunc(OperatorKey k, int rp, List inp, String funcSpec, EvalFunc func) {
+//		super(k, rp, inp);
+        super(k, rp);
+        super.setInputs(inp);
+		this.funcSpec = funcSpec;
+		this.func = func;
+	}
+
+	private void instantiateFunc() {
+		this.func = (EvalFunc) PigContext.instantiateFuncFromSpec(this.funcSpec);
+	}
+
+	private Result getNext() throws ExecException {
+		Tuple t = null;
+		Result result = new Result();
+		// instantiate the function if its null
+		if (func == null)
+			instantiateFunc();
+
+		try {
+			if (inputAttached) {
+				result.result = func.exec(input);
+				result.returnStatus = (result.result != null) ? POStatus.STATUS_OK
+						: POStatus.STATUS_EOP;
+				return result;
+			} else {
+				Result in = inputs.get(0).getNext(t);
+				if (in.returnStatus == POStatus.STATUS_EOP) {
+					result.returnStatus = POStatus.STATUS_EOP;
+					return result;
+				}
+				result.result = func.exec((Tuple) in.result);
+				result.returnStatus = POStatus.STATUS_OK;
+				return result;
+			}
+		} catch (IOException e) {
+			log.error(e);
+			//throw new ExecException(e.getCause());
+		}
+		result.returnStatus = POStatus.STATUS_ERR;
+		return result;
+	}
+
+	@Override
+	public Result getNext(Tuple tIn) throws ExecException {
+		return getNext();
+	}
+
+	@Override
+	public Result getNext(DataBag db) throws ExecException {
+		return getNext();
+	}
+
+	@Override
+	public Result getNext(Integer i) throws ExecException {
+		return getNext();
+	}
+
+	@Override
+	public Result getNext(Boolean b) throws ExecException {
+
+		return getNext();
+	}
+
+	@Override
+	public Result getNext(DataByteArray ba) throws ExecException {
+
+		return getNext();
+	}
+
+	@Override
+	public Result getNext(Double d) throws ExecException {
+
+		return getNext();
+	}
+
+	@Override
+	public Result getNext(Float f) throws ExecException {
+
+		return getNext();
+	}
+
+	@Override
+	public Result getNext(Long l) throws ExecException {
+
+		return getNext();
+	}
+
+	@Override
+	public Result getNext(Map m) throws ExecException {
+
+		return getNext();
+	}
+
+	@Override
+	public Result getNext(String s) throws ExecException {
+
+		return getNext();
+	}
+
+	public void setAlgebraicFunction(Byte Function) {
+		// This will only be used by the optimizer for putting correct functions
+		// in the mapper,
+		// combiner and reduce. This helps in maintaining the physical plan as
+		// is without the
+		// optimiser having to replace any operators.
+		// You wouldn't be able to make two calls to this function on the same
+		// algebraic EvalFunc as
+		// func is being changed.
+		switch (Function) {
+		case INITIAL:
+			func = (EvalFunc) PigContext.instantiateFuncFromSpec(getInitial());
+			setResultType(DataType.findType(((EvalFunc) func).getReturnType()));
+			break;
+		case INTERMEDIATE:
+			func = (EvalFunc) PigContext.instantiateFuncFromSpec(getIntermed());
+			setResultType(DataType.findType(((EvalFunc) func).getReturnType()));
+			break;
+		case FINAL:
+			func = (EvalFunc) PigContext.instantiateFuncFromSpec(getFinal());
+			setResultType(DataType.findType(((EvalFunc) func).getReturnType()));
+			break;
+
+		}
+	}
+
+	public String getInitial() {
+		if (func == null)
+			instantiateFunc();
+
+		if (func instanceof Algebraic) {
+			return ((Algebraic) func).getInitial();
+		} else {
+			log
+					.error("Attempt to run a non-algebraic function as an algebraic function");
+		}
+		return null;
+	}
+
+	public String getIntermed() {
+		if (func == null)
+			instantiateFunc();
+
+		if (func instanceof Algebraic) {
+			return ((Algebraic) func).getIntermed();
+		} else {
+			log
+					.error("Attempt to run a non-algebraic function as an algebraic function");
+		}
+		return null;
+	}
+
+	public String getFinal() {
+		if (func == null)
+			instantiateFunc();
+
+		if (func instanceof Algebraic) {
+			return ((Algebraic) func).getFinal();
+		} else {
+			log
+					.error("Attempt to run a non-algebraic function as an algebraic function");
+		}
+		return null;
+	}
+
+	public Type getReturnType() {
+		if (func == null)
+			instantiateFunc();
+
+		return func.getReturnType();
+	}
+
+	public void finish() {
+		if (func == null)
+			instantiateFunc();
+
+		func.finish();
+	}
+
+	public Schema outputSchema(Schema input) {
+		if (func == null)
+			instantiateFunc();
+
+		return func.outputSchema(input);
+	}
+
+	public Boolean isAsynchronous() {
+		if (func == null)
+			instantiateFunc();
+		
+		return func.isAsynchronous();
+	}
+
+	@Override
+	public String name() {
+	    if(funcSpec!=null)
+	        return "POUserFunc" + "(" + funcSpec + ")" + " - " + mKey.toString();
+        else
+            return "POUserFunc" + "(" + "DummySpec" + ")" + " - " + mKey.toString();
+	}
+
+	@Override
+	public boolean supportsMultipleInputs() {
+
+		return false;
+	}
+
+	@Override
+	public boolean supportsMultipleOutputs() {
+
+		return false;
+	}
+
+	@Override
+	public void visit(ExprPlanVisitor v) throws VisitorException {
+
+		v.visitUserFunc(this);
+	}
+
+    public String getFuncSpec() {
+        return funcSpec;
+    }
+
+}

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/plan/PlanWalker.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/plan/PlanWalker.java?rev=656913&r1=656912&r2=656913&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/plan/PlanWalker.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/plan/PlanWalker.java Thu May 15 18:09:30 2008
@@ -56,9 +56,14 @@
      */
     public abstract PlanWalker<O, P> spawnChildWalker(P plan);
 
-    public P getPlan() {
+	public P getPlan() {
         return mPlan ;
     }
+    
+    public void setPlan(P plan) {
+        mPlan = plan;
+    }
+
 }
 
 

Added: incubator/pig/branches/types/test/org/apache/pig/test/TestJobSubmission.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestJobSubmission.java?rev=656913&view=auto
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestJobSubmission.java (added)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestJobSubmission.java Thu May 15 18:09:30 2008
@@ -0,0 +1,451 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.jobcontrol.Job;
+import org.apache.hadoop.mapred.jobcontrol.JobControl;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.executionengine.ExecutionEngine;
+import org.apache.pig.backend.hadoop.datastorage.HConfiguration;
+import org.apache.pig.backend.hadoop.executionengine.HExecutionEngine;
+import org.apache.pig.builtin.BinStorage;
+import org.apache.pig.builtin.PigStorage;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.DefaultBagFactory;
+import org.apache.pig.data.DefaultTuple;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.logicalLayer.OperatorKey;
+import org.apache.pig.impl.mapReduceLayer.JobControlCompiler;
+import org.apache.pig.impl.mapReduceLayer.MRCompiler;
+import org.apache.pig.impl.mapReduceLayer.MapReduceLauncher;
+import org.apache.pig.impl.mapReduceLayer.plans.MROperPlan;
+import org.apache.pig.impl.physicalLayer.POStatus;
+import org.apache.pig.impl.physicalLayer.Result;
+import org.apache.pig.impl.physicalLayer.plans.ExprPlan;
+import org.apache.pig.impl.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POFilter;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POForEach;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POGenerate;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POGlobalRearrange;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POLoad;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POLocalRearrange;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POPackage;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POSplit;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POStore;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POUnion;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.PhysicalOperator;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.ConstantExpression;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.POCast;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.POProject;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.binaryExprOps.comparators.ComparisonOperator;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.plan.PlanException;
+import org.apache.pig.impl.util.ObjectSerializer;
+import org.apache.pig.test.utils.GenPhyOp;
+import org.apache.pig.test.utils.GenRandomData;
+import org.apache.pig.test.utils.TestHelper;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestJobSubmission extends junit.framework.TestCase{
+    
+    
+    static PigContext pc;
+    String ldFile;
+    String expFile;
+    PhysicalPlan<PhysicalOperator> php = new PhysicalPlan<PhysicalOperator>();
+    String stFile;
+    String hadoopLdFile;
+    String grpName;
+    Random r = new Random();
+    String curDir;
+    String inpDir;
+    String golDir;
+    
+    static {
+        MiniCluster cluster = MiniCluster.buildCluster();
+        pc = new PigContext();
+        try {
+            pc.connect();
+        } catch (ExecException e) {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
+        }
+        GenPhyOp.setPc(pc);
+        
+    }
+    @Before
+    public void setUp() throws Exception{
+        curDir = System.getProperty("user.dir");
+        inpDir = curDir + File.separatorChar + "test/org/apache/pig/test/data/InputFiles/";
+        golDir = curDir + File.separatorChar + "test/org/apache/pig/test/data/GoldenFiles/";
+    }
+    
+    @After
+    public void tearDown() throws Exception {
+    }
+    
+    private void generateInput(int numTuples) throws ExecException{
+        
+        DataBag inpDb = GenRandomData.genRandSmallTupDataBag(r, numTuples, 1000);
+        
+        POProject proj = new POProject(new OperatorKey("", r.nextLong()));
+        Tuple t = new DefaultTuple();
+        t.append(inpDb);
+        proj.attachInput(t);
+        proj.setColumn(0);
+        proj.setOverloaded(true);
+        proj.setResultType(DataType.TUPLE);
+        
+        List<PhysicalOperator> inps = new ArrayList<PhysicalOperator>();
+        inps.add(proj);
+        
+        POStore str = new POStore(new OperatorKey("", r.nextLong()));
+        str.setInputs(inps);
+        
+        FileSpec fSpec = new FileSpec(ldFile,PigStorage.class.getName());
+        
+        str.setSFile(fSpec);
+        str.setPc(pc);
+        str.store();
+    }
+    
+    private void setUp1(boolean gen) throws Exception {
+        ldFile = "file:" + inpDir + "jsTst1.txt";
+        expFile = ldFile;
+        stFile = "jsTst1";
+        grpName = "jobSubTst1";
+        
+        if(gen){
+            generateInput(100);
+            return;
+        }
+        
+        hadoopLdFile = FileLocalizer.hadoopify(ldFile, pc);
+
+        FileSpec LFSpec = new FileSpec(hadoopLdFile,PigStorage.class.getName());
+        FileSpec SFSpec = new FileSpec(stFile,PigStorage.class.getName());
+
+        POLoad ld = new POLoad(new OperatorKey("", r.nextLong()));
+        POStore st = new POStore(new OperatorKey("", r.nextLong()));
+        ld.setPc(pc);
+        ld.setLFile(LFSpec);
+        st.setPc(pc);
+        st.setSFile(SFSpec);
+        
+        php.add(ld);
+        php.add(st);
+        php.connect(ld, st);
+     }
+
+//    @Test
+    public void testCompile1() throws Exception {
+        boolean gen = false;
+
+        setUp1(gen);
+        
+        if(gen)
+            return;
+
+        submit();
+        
+        assertEquals(true, FileLocalizer.fileExists(stFile, pc));
+        
+        FileSpec fSpecExp = new FileSpec(expFile,PigStorage.class.getName());
+        FileSpec fSpecAct = new FileSpec(stFile,PigStorage.class.getName());
+        
+        assertEquals(true, TestHelper.areFilesSame(fSpecExp, fSpecAct, pc));
+    }
+    
+    private void setUp2(boolean gen) throws Exception {
+        ldFile = "file:" + inpDir + "jsTst2.txt";
+        expFile = ldFile;
+        stFile = "jsTst2";
+        grpName = "jobSubTst2";
+        
+        if(gen){
+            generateInput(1000);
+            return;
+        }
+        
+        hadoopLdFile = FileLocalizer.hadoopify(ldFile, pc);
+
+        FileSpec LFSpec = new FileSpec(hadoopLdFile,PigStorage.class.getName());
+        FileSpec SFSpec = new FileSpec(stFile,PigStorage.class.getName());
+
+        POLoad ld = new POLoad(new OperatorKey("", r.nextLong()));
+        POStore st = new POStore(new OperatorKey("", r.nextLong()));
+        ld.setPc(pc);
+        ld.setLFile(LFSpec);
+        st.setPc(pc);
+        st.setSFile(SFSpec);
+        
+        php.add(ld);
+        php.add(st);
+        php.connect(ld, st);
+     }
+
+//    @Test
+    public void testCompile2() throws Exception {
+        boolean gen = false;
+
+        setUp2(gen);
+        
+        if(gen)
+            return;
+
+        submit();
+        
+        assertEquals(true, FileLocalizer.fileExists(stFile, pc));
+        
+        FileSpec fSpecExp = new FileSpec(expFile,PigStorage.class.getName());
+        FileSpec fSpecAct = new FileSpec(stFile,PigStorage.class.getName());
+        
+        assertEquals(true, TestHelper.areFilesSame(fSpecExp, fSpecAct, pc));
+    }
+    
+    private void setUp3(boolean gen) throws Exception {
+        ldFile = "file:" + inpDir + "jsTst1.txt";
+        expFile = "file:" + golDir + "jsTst3";
+        stFile = "jsTst3";
+        grpName = "jobSubTst3";
+        
+        if(gen){
+            generateInput(1000);
+            return;
+        }
+        
+        hadoopLdFile = FileLocalizer.hadoopify(ldFile, pc);
+
+        FileSpec LFSpec = new FileSpec(hadoopLdFile,PigStorage.class.getName());
+        FileSpec SFSpec = new FileSpec(stFile,PigStorage.class.getName());
+
+        POLoad ld = new POLoad(new OperatorKey("", r.nextLong()));
+        POStore st = new POStore(new OperatorKey("", r.nextLong()));
+        ld.setPc(pc);
+        ld.setLFile(LFSpec);
+        st.setPc(pc);
+        st.setSFile(SFSpec);
+        
+        int[] flds = {0,1};
+        Tuple sample = new DefaultTuple();
+        sample.append(new String("S"));
+        sample.append(new Integer("10"));
+        
+        POForEach fe = GenPhyOp.topForEachOPWithPlan(flds , sample);
+        
+        POFilter fl = GenPhyOp.topFilterOpWithProj(1, 500, GenPhyOp.LT);
+        
+        php.add(ld);
+        php.add(fe);
+        php.connect(ld, fe);
+        
+        php.add(fl);
+        php.connect(fe, fl);
+        
+        php.add(st);
+        php.connect(fl, st);
+     }
+
+//    @Test
+    public void testCompile3() throws Exception {
+        boolean gen = false;
+
+        setUp3(gen);
+        
+        if(gen)
+            return;
+
+        submit();
+        
+        assertEquals(true, FileLocalizer.fileExists(stFile, pc));
+        
+        FileSpec fSpecExp = new FileSpec(expFile,PigStorage.class.getName()+"(',')");
+        FileSpec fSpecAct = new FileSpec(stFile,PigStorage.class.getName());
+        
+        assertEquals(true, TestHelper.areFilesSame(fSpecExp, fSpecAct, pc));
+    }
+    
+    private void setUp4(boolean gen) throws Exception {
+        ldFile = "file:" + inpDir + "jsTst1.txt";
+        expFile = "file:" + golDir + "jsTst4";
+        stFile = "jsTst4";
+        grpName = "jobSubTst4";
+        
+        if(gen){
+            generateInput(1000);
+            return;
+        }
+        
+        hadoopLdFile = FileLocalizer.hadoopify(ldFile, pc);
+
+        FileSpec LFSpec = new FileSpec(hadoopLdFile,PigStorage.class.getName());
+        FileSpec SFSpec = new FileSpec(stFile,PigStorage.class.getName());
+
+        POLoad ld = new POLoad(new OperatorKey("", r.nextLong()));
+        POStore st = new POStore(new OperatorKey("", r.nextLong()));
+        ld.setPc(pc);
+        ld.setLFile(LFSpec);
+        st.setPc(pc);
+        st.setSFile(SFSpec);
+        
+        POSplit spl = GenPhyOp.topSplitOp();
+        POFilter fl1 = GenPhyOp.topFilterOpWithProjWithCast(1, 200, GenPhyOp.LT);
+        POFilter fl2 = GenPhyOp.topFilterOpWithProjWithCast(1, 800, GenPhyOp.GT);
+        
+        POUnion un = GenPhyOp.topUnionOp();
+        
+        php.add(ld);
+        php.add(spl);
+        php.connect(ld, spl);
+        
+        php.add(fl1);
+        php.connect(spl, fl1);
+        
+        php.add(fl2);
+        php.connect(spl, fl2);
+        
+        php.add(un);
+        php.connect(fl1, un);
+        php.connect(fl2, un);
+        
+        php.add(st);
+        php.connect(un, st);
+     }
+
+//    @Test
+    public void testCompile4() throws Exception {
+        boolean gen = false;
+
+        setUp4(gen);
+        
+        if(gen)
+            return;
+        
+        submit();
+        
+        assertEquals(true, FileLocalizer.fileExists(stFile, pc));
+        
+        FileSpec fSpecExp = new FileSpec(expFile,PigStorage.class.getName()+"(',')");
+        FileSpec fSpecAct = new FileSpec(stFile,PigStorage.class.getName());
+        
+        assertEquals(true, TestHelper.areFilesSame(fSpecExp, fSpecAct, pc));
+        
+    }
+    
+    private void setUp5(boolean gen) throws Exception {
+        ldFile = "file:" + inpDir + "jsTst5.txt";
+        expFile = ldFile;
+        stFile = "jsTst5";
+        grpName = "jobSubTst5";
+        
+        if(gen){
+            generateInput(1000);
+            return;
+        }
+        
+        hadoopLdFile = FileLocalizer.hadoopify(ldFile, pc);
+
+        FileSpec LFSpec = new FileSpec(hadoopLdFile,PigStorage.class.getName()+"(',')");
+        FileSpec SFSpec = new FileSpec(stFile,PigStorage.class.getName());
+
+        POLoad ld = new POLoad(new OperatorKey("", r.nextLong()));
+        POStore st = new POStore(new OperatorKey("", r.nextLong()));
+        ld.setPc(pc);
+        ld.setLFile(LFSpec);
+        st.setPc(pc);
+        st.setSFile(SFSpec);
+        
+        Tuple sample = new DefaultTuple();
+        sample.append("S");
+        sample.append(1);
+        POLocalRearrange lr = GenPhyOp.topLocalRearrangeOPWithPlan(0, 1, sample);
+        
+        POGlobalRearrange gr = GenPhyOp.topGlobalRearrangeOp();
+        
+        POPackage pk = GenPhyOp.topPackageOp();
+        pk.setKeyType(DataType.INTEGER);
+        pk.setNumInps(1);
+        boolean[] inner = {false}; 
+        pk.setInner(inner);
+        
+        POForEach fe = GenPhyOp.topForEachOPWithPlan(1);
+        
+        php.add(ld);
+        php.add(lr);
+        php.connect(ld, lr);
+        
+        php.add(gr);
+        php.connect(lr, gr);
+        
+        php.add(pk);
+        php.connect(gr, pk);
+        
+        php.add(fe);
+        php.connect(pk, fe);
+        
+        php.add(st);
+        php.connect(fe, st);
+     }
+
+    @Test
+    public void testCompile5() throws Exception {
+        boolean gen = false;
+
+        setUp5(gen);
+        
+        if(gen)
+            return;
+        
+        submit();
+        
+        assertEquals(true, FileLocalizer.fileExists(stFile, pc));
+        
+        FileSpec fSpecExp = new FileSpec(expFile,PigStorage.class.getName()+"(',')");
+        FileSpec fSpecAct = new FileSpec(stFile,PigStorage.class.getName());
+        
+        assertEquals(true, TestHelper.areFilesSame(fSpecExp, fSpecAct, pc));
+        
+    }
+    
+    private void submit() throws Exception{
+        assertEquals(true, FileLocalizer.fileExists(hadoopLdFile, pc));
+        MapReduceLauncher mrl = new MapReduceLauncher();
+        mrl.launchPig(php, grpName, pc);  
+    }
+}

Added: incubator/pig/branches/types/test/org/apache/pig/test/TestLocalJobSubmission.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestLocalJobSubmission.java?rev=656913&view=auto
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestLocalJobSubmission.java (added)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestLocalJobSubmission.java Thu May 15 18:09:30 2008
@@ -0,0 +1,470 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.jobcontrol.Job;
+import org.apache.hadoop.mapred.jobcontrol.JobControl;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.executionengine.ExecutionEngine;
+import org.apache.pig.backend.hadoop.datastorage.HConfiguration;
+import org.apache.pig.backend.hadoop.executionengine.HExecutionEngine;
+import org.apache.pig.builtin.BinStorage;
+import org.apache.pig.builtin.PigStorage;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.DefaultBagFactory;
+import org.apache.pig.data.DefaultTuple;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.logicalLayer.OperatorKey;
+import org.apache.pig.impl.mapReduceLayer.JobControlCompiler;
+import org.apache.pig.impl.mapReduceLayer.LocalLauncher;
+import org.apache.pig.impl.mapReduceLayer.MRCompiler;
+import org.apache.pig.impl.mapReduceLayer.MapReduceLauncher;
+import org.apache.pig.impl.mapReduceLayer.plans.MROperPlan;
+import org.apache.pig.impl.physicalLayer.POStatus;
+import org.apache.pig.impl.physicalLayer.Result;
+import org.apache.pig.impl.physicalLayer.plans.ExprPlan;
+import org.apache.pig.impl.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POFilter;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POForEach;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POGenerate;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POGlobalRearrange;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POLoad;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POLocalRearrange;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POPackage;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POSplit;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POStore;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POUnion;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.PhysicalOperator;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.ConstantExpression;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.POCast;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.POProject;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.binaryExprOps.comparators.ComparisonOperator;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.plan.PlanException;
+import org.apache.pig.impl.util.ObjectSerializer;
+import org.apache.pig.test.utils.GenPhyOp;
+import org.apache.pig.test.utils.GenRandomData;
+import org.apache.pig.test.utils.TestHelper;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestLocalJobSubmission extends junit.framework.TestCase{
+    static PigContext pc;
+    String ldFile;
+    String expFile;
+    PhysicalPlan<PhysicalOperator> php = new PhysicalPlan<PhysicalOperator>();
+    String stFile;
+    String grpName;
+    String curDir;
+    String outDir;
+    String inpDir;
+    String golDir;
+    Random r = new Random();
+    
+    static {
+//        MiniCluster cluster = MiniCluster.buildCluster();
+        pc = new PigContext();
+        try {
+            pc.connect();
+        } catch (ExecException e) {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
+        }
+        GenPhyOp.setPc(pc);
+    }
+    
+    
+    
+    /*@Override
+    protected void finalize() throws Throwable {
+        super.finalize();
+        if(outDir!=null && new File(outDir).exists())
+            rmrf(outDir);
+    }*/
+
+    @Before
+    public void setUp() throws Exception{
+        curDir = System.getProperty("user.dir");
+        inpDir = curDir + File.separatorChar + "test/org/apache/pig/test/data/InputFiles/";
+        outDir = curDir + File.separatorChar + "test/org/apache/pig/test/data/OutputFiles/";
+        golDir = curDir + File.separatorChar + "test/org/apache/pig/test/data/GoldenFiles/";
+        File f = new File(outDir);
+        boolean didMakeDir = f.mkdirs();
+        /*if(!didMakeDir)
+            throw new Exception("Could not Create Directory " + outDir);*/
+    }
+    
+    private void rmrf(String dir) throws Exception{
+        File f = new File(dir);
+        File[] ls = f.listFiles();
+        for (File file : ls) {
+            if(file.isFile())
+                file.delete();
+            else
+                rmrf(file.getPath());
+        }
+        boolean isDeleted = f.delete();
+        if(!isDeleted)
+            throw new Exception("Could not Delete Directory" + dir);
+    }
+    
+    @After
+    public void tearDown() throws Exception {
+        rmrf(outDir);
+    }
+    
+    private void generateInput(int numTuples) throws ExecException{
+        
+        DataBag inpDb = GenRandomData.genRandSmallTupDataBag(r, numTuples, 1000);
+        
+        POProject proj = new POProject(new OperatorKey("", r.nextLong()));
+        Tuple t = new DefaultTuple();
+        t.append(inpDb);
+        proj.attachInput(t);
+        proj.setColumn(0);
+        proj.setOverloaded(true);
+        proj.setResultType(DataType.TUPLE);
+        
+        List<PhysicalOperator> inps = new ArrayList<PhysicalOperator>();
+        inps.add(proj);
+        
+        POStore str = new POStore(new OperatorKey("", r.nextLong()));
+        str.setInputs(inps);
+        
+        FileSpec fSpec = new FileSpec(ldFile,PigStorage.class.getName());
+        
+        str.setSFile(fSpec);
+        str.setPc(pc);
+        str.store();
+    }
+    
+    private void setUp1(boolean gen) throws Exception {
+        
+        ldFile = "file:" + inpDir + "jsTst1.txt";
+        expFile = ldFile;
+        stFile = "file:" + outDir + "jsTst1";
+        grpName = "jobSubTst1";
+        
+        if(gen){
+            generateInput(100);
+            return;
+        }
+        
+        FileSpec LFSpec = new FileSpec(ldFile,PigStorage.class.getName());
+        FileSpec SFSpec = new FileSpec(stFile,PigStorage.class.getName());
+
+        POLoad ld = new POLoad(new OperatorKey("", r.nextLong()));
+        POStore st = new POStore(new OperatorKey("", r.nextLong()));
+        ld.setPc(pc);
+        ld.setLFile(LFSpec);
+        st.setPc(pc);
+        st.setSFile(SFSpec);
+        
+        php.add(ld);
+        php.add(st);
+        php.connect(ld, st);
+     }
+
+    @Test
+    public void testCompile1() throws Exception {
+        boolean gen = false;
+
+        setUp1(gen);
+        
+        if(gen)
+            return;
+
+        submit();
+        
+        assertEquals(true, FileLocalizer.fileExists(stFile, pc));
+        
+        FileSpec fSpecExp = new FileSpec(expFile,PigStorage.class.getName());
+        FileSpec fSpecAct = new FileSpec(stFile,PigStorage.class.getName());
+        
+        assertEquals(true, TestHelper.areFilesSame(fSpecExp, fSpecAct, pc));
+    }
+    
+    private void setUp2(boolean gen) throws Exception {
+        ldFile = "file:" + inpDir + "jsTst2.txt";
+        expFile = ldFile;
+        stFile = "file:" + outDir + "jsTst2";
+        grpName = "jobSubTst2";
+        
+        if(gen){
+            generateInput(1000);
+            return;
+        }
+        
+        FileSpec LFSpec = new FileSpec(ldFile,PigStorage.class.getName());
+        FileSpec SFSpec = new FileSpec(stFile,PigStorage.class.getName());
+
+        POLoad ld = new POLoad(new OperatorKey("", r.nextLong()));
+        POStore st = new POStore(new OperatorKey("", r.nextLong()));
+        ld.setPc(pc);
+        ld.setLFile(LFSpec);
+        st.setPc(pc);
+        st.setSFile(SFSpec);
+        
+        php.add(ld);
+        php.add(st);
+        php.connect(ld, st);
+     }
+
+    @Test
+    public void testCompile2() throws Exception {
+        boolean gen = false;
+
+        setUp2(gen);
+        
+        if(gen)
+            return;
+
+        submit();
+        
+        assertEquals(true, FileLocalizer.fileExists(stFile, pc));
+        
+        FileSpec fSpecExp = new FileSpec(expFile,PigStorage.class.getName());
+        FileSpec fSpecAct = new FileSpec(stFile,PigStorage.class.getName());
+        
+        assertEquals(true, TestHelper.areFilesSame(fSpecExp, fSpecAct, pc));
+    }
+    
+    private void setUp3(boolean gen) throws Exception {
+        ldFile = "file:" + inpDir + "jsTst1.txt";
+        expFile = "file:" + golDir + "jsTst3";
+        stFile = "file:" + outDir + "jsTst3";
+        grpName = "jobSubTst3";
+        
+        if(gen){
+            generateInput(1000);
+            return;
+        }
+        
+        FileSpec LFSpec = new FileSpec(ldFile,PigStorage.class.getName());
+        FileSpec SFSpec = new FileSpec(stFile,PigStorage.class.getName());
+
+        POLoad ld = new POLoad(new OperatorKey("", r.nextLong()));
+        POStore st = new POStore(new OperatorKey("", r.nextLong()));
+        ld.setPc(pc);
+        ld.setLFile(LFSpec);
+        st.setPc(pc);
+        st.setSFile(SFSpec);
+        
+        int[] flds = {0,1};
+        Tuple sample = new DefaultTuple();
+        sample.append(new String("S"));
+        sample.append(new Integer("10"));
+        
+        POForEach fe = GenPhyOp.topForEachOPWithPlan(flds , sample);
+        
+        POFilter fl = GenPhyOp.topFilterOpWithProj(1, 500, GenPhyOp.LT);
+        
+        php.add(ld);
+        php.add(fe);
+        php.connect(ld, fe);
+        
+        php.add(fl);
+        php.connect(fe, fl);
+        
+        php.add(st);
+        php.connect(fl, st);
+     }
+
+    @Test
+    public void testCompile3() throws Exception {
+        boolean gen = false;
+
+        setUp3(gen);
+        
+        if(gen)
+            return;
+
+        submit();
+        
+        assertEquals(true, FileLocalizer.fileExists(stFile, pc));
+        
+        FileSpec fSpecExp = new FileSpec(expFile,PigStorage.class.getName()+"(',')");
+        FileSpec fSpecAct = new FileSpec(stFile,PigStorage.class.getName());
+        
+        assertEquals(true, TestHelper.areFilesSame(fSpecExp, fSpecAct, pc));
+    }
+    
+    private void setUp4(boolean gen) throws Exception {
+        ldFile = "file:" + inpDir + "jsTst1.txt";
+        expFile = "file:" + golDir + "jsTst4";
+        stFile = "file:" + outDir + "jsTst4";
+        grpName = "jobSubTst4";
+        
+        if(gen){
+            generateInput(1000);
+            return;
+        }
+        
+        FileSpec LFSpec = new FileSpec(ldFile,PigStorage.class.getName());
+        FileSpec SFSpec = new FileSpec(stFile,PigStorage.class.getName());
+
+        POLoad ld = new POLoad(new OperatorKey("", r.nextLong()));
+        POStore st = new POStore(new OperatorKey("", r.nextLong()));
+        ld.setPc(pc);
+        ld.setLFile(LFSpec);
+        st.setPc(pc);
+        st.setSFile(SFSpec);
+        
+        POSplit spl = GenPhyOp.topSplitOp();
+        POFilter fl1 = GenPhyOp.topFilterOpWithProjWithCast(1, 200, GenPhyOp.LT);
+        POFilter fl2 = GenPhyOp.topFilterOpWithProjWithCast(1, 800, GenPhyOp.GT);
+        
+        POUnion un = GenPhyOp.topUnionOp();
+        
+        php.add(ld);
+        php.add(spl);
+        php.connect(ld, spl);
+        
+        php.add(fl1);
+        php.connect(spl, fl1);
+        
+        php.add(fl2);
+        php.connect(spl, fl2);
+        
+        php.add(un);
+        php.connect(fl1, un);
+        php.connect(fl2, un);
+        
+        php.add(st);
+        php.connect(un, st);
+     }
+
+    @Test
+    public void testCompile4() throws Exception {
+        boolean gen = false;
+
+        setUp4(gen);
+        
+        if(gen)
+            return;
+        
+        submit();
+        
+        assertEquals(true, FileLocalizer.fileExists(stFile, pc));
+        
+        FileSpec fSpecExp = new FileSpec(expFile,PigStorage.class.getName()+"(',')");
+        FileSpec fSpecAct = new FileSpec(stFile,PigStorage.class.getName());
+        
+        assertEquals(true, TestHelper.areFilesSame(fSpecExp, fSpecAct, pc));
+        
+    }
+    
+    private void setUp5(boolean gen) throws Exception {
+        ldFile = "file:" + inpDir + "jsTst5.txt";
+        expFile = ldFile;
+        stFile = "file:" + outDir + "jsTst5";
+        grpName = "jobSubTst5";
+        
+        if(gen){
+            generateInput(1000);
+            return;
+        }
+        
+        FileSpec LFSpec = new FileSpec(ldFile,PigStorage.class.getName()+"(',')");
+        FileSpec SFSpec = new FileSpec(stFile,PigStorage.class.getName());
+
+        POLoad ld = new POLoad(new OperatorKey("", r.nextLong()));
+        POStore st = new POStore(new OperatorKey("", r.nextLong()));
+        ld.setPc(pc);
+        ld.setLFile(LFSpec);
+        st.setPc(pc);
+        st.setSFile(SFSpec);
+        
+        Tuple sample = new DefaultTuple();
+        sample.append("S");
+        sample.append(1);
+        POLocalRearrange lr = GenPhyOp.topLocalRearrangeOPWithPlan(0, 1, sample);
+        
+        POGlobalRearrange gr = GenPhyOp.topGlobalRearrangeOp();
+        
+        POPackage pk = GenPhyOp.topPackageOp();
+        pk.setKeyType(DataType.INTEGER);
+        pk.setNumInps(1);
+        boolean[] inner = {false}; 
+        pk.setInner(inner);
+        
+        POForEach fe = GenPhyOp.topForEachOPWithPlan(1);
+        
+        php.add(ld);
+        php.add(lr);
+        php.connect(ld, lr);
+        
+        php.add(gr);
+        php.connect(lr, gr);
+        
+        php.add(pk);
+        php.connect(gr, pk);
+        
+        php.add(fe);
+        php.connect(pk, fe);
+        
+        php.add(st);
+        php.connect(fe, st);
+     }
+
+    @Test
+    public void testCompile5() throws Exception {
+        boolean gen = false;
+
+        setUp5(gen);
+        
+        if(gen)
+            return;
+        
+        submit();
+        
+        assertEquals(true, FileLocalizer.fileExists(stFile, pc));
+        
+        FileSpec fSpecExp = new FileSpec(expFile,PigStorage.class.getName()+"(',')");
+        FileSpec fSpecAct = new FileSpec(stFile,PigStorage.class.getName());
+        
+        assertEquals(true, TestHelper.areFilesSame(fSpecExp, fSpecAct, pc));
+        
+    }
+    
+    private void submit() throws Exception{
+        assertEquals(true, FileLocalizer.fileExists(ldFile, pc));
+        LocalLauncher ll = new LocalLauncher();
+        ll.launchPig(php, grpName, pc);  
+    }
+}

Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestLocalRearrange.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestLocalRearrange.java?rev=656913&r1=656912&r2=656913&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestLocalRearrange.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestLocalRearrange.java Thu May 15 18:09:30 2008
@@ -30,11 +30,15 @@
 import org.apache.pig.data.DefaultTuple;
 import org.apache.pig.data.IndexedTuple;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.physicalLayer.POStatus;
 import org.apache.pig.impl.physicalLayer.Result;
+import org.apache.pig.impl.physicalLayer.plans.ExprPlan;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POLocalRearrange;
 import org.apache.pig.impl.physicalLayer.topLevelOperators.POLocalRearrange;
 import org.apache.pig.impl.physicalLayer.topLevelOperators.PhysicalOperator;
 import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.POProject;
+import org.apache.pig.impl.plan.PlanException;
 import org.apache.pig.test.utils.GenPhyOp;
 import org.apache.pig.test.utils.GenRandomData;
 import org.apache.pig.test.utils.TestHelper;
@@ -58,6 +62,9 @@
     public void setUp() throws Exception {
         Random r = new Random();
         db = GenRandomData.genRandSmallTupDataBag(r, 10, 100);
+    }
+    
+    private void setUp1() throws PlanException, ExecException{
         lr = GenPhyOp.topLocalRearrangeOPWithPlanPlain(0,0,db.iterator().next());
         POProject proj = GenPhyOp.exprProject();
         proj.setColumn(0);
@@ -76,7 +83,8 @@
     }
 
     @Test
-    public void testGetNextTuple() throws ExecException, IOException {
+    public void testGetNextTuple1() throws ExecException, PlanException {
+        setUp1();
         int size=0;
         for(Result res=lr.getNext(t);res.returnStatus!=POStatus.STATUS_EOP;res=lr.getNext(t)){
             Tuple t = (Tuple)res.result;
@@ -96,5 +104,50 @@
         //check if all the tuples in the input are generated
         assertEquals(true, size==db.size());
     }
+    
+    private void setUp2() throws PlanException, ExecException{
+        lr = GenPhyOp.topLocalRearrangeOPWithPlanPlain(0,0,db.iterator().next());
+        List<ExprPlan> plans = lr.getPlans();
+        POLocalRearrange lrT = GenPhyOp.topLocalRearrangeOPWithPlanPlain(0, 1, db.iterator().next());
+        List<ExprPlan> plansT = lrT.getPlans();
+        plans.add(plansT.get(0));
+        lr.setPlans(plans);
+        
+        POProject proj = GenPhyOp.exprProject();
+        proj.setColumn(0);
+        proj.setResultType(DataType.TUPLE);
+        proj.setOverloaded(true);
+        Tuple t = new DefaultTuple();
+        t.append(db);
+        proj.attachInput(t);
+        List<PhysicalOperator> inputs = new ArrayList<PhysicalOperator>();
+        inputs.add(proj);
+        lr.setInputs(inputs);
+    }
+    
+    @Test
+    public void testGetNextTuple2() throws ExecException, PlanException {
+        setUp2();
+        int size=0;
+        for(Result res=lr.getNext(t);res.returnStatus!=POStatus.STATUS_EOP;res=lr.getNext(t)){
+            Tuple t = (Tuple)res.result;
+            IndexedTuple it = (IndexedTuple)t.get(1);
+            //Check if the index is same as input index
+            assertEquals((float)0, (float)it.index, 0.01f);
+            
+            //Check if the input baf contains the value tuple
+            assertEquals(true, TestHelper.bagContains(db, it.toTuple()));
+            
+            //Check if the input key and the output key are same
+            Tuple inpKey = TupleFactory.getInstance().newTuple(2); 
+            inpKey.set(0,it.toTuple().get(0));
+            inpKey.set(1,it.toTuple().get(1));
+            assertEquals(true, inpKey.compareTo((Tuple)t.get(0))==0);
+            ++size;
+        }
+        
+        //check if all the tuples in the input are generated
+        assertEquals(true, size==db.size());
+    }
 
 }

Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestMRCompiler.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestMRCompiler.java?rev=656913&r1=656912&r2=656913&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestMRCompiler.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestMRCompiler.java Thu May 15 18:09:30 2008
@@ -17,8 +17,6 @@
  */
 package org.apache.pig.test;
 
-import static org.junit.Assert.assertEquals;
-
 import java.io.ByteArrayOutputStream;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
@@ -29,23 +27,39 @@
 import java.util.List;
 import java.util.Random;
 
+import org.apache.pig.ComparisonFunc;
 import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.builtin.AVG;
+import org.apache.pig.builtin.SUM;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.builtin.FindQuantiles;
+import org.apache.pig.impl.builtin.GFCross;
+import org.apache.pig.impl.logicalLayer.OperatorKey;
 import org.apache.pig.impl.mapReduceLayer.MRCompiler;
 import org.apache.pig.impl.mapReduceLayer.plans.MROperPlan;
+import org.apache.pig.impl.physicalLayer.plans.ExprPlan;
 import org.apache.pig.impl.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.impl.physicalLayer.plans.PlanPrinter;
 import org.apache.pig.impl.physicalLayer.topLevelOperators.POFilter;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POForEach;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POGenerate;
 import org.apache.pig.impl.physicalLayer.topLevelOperators.POGlobalRearrange;
 import org.apache.pig.impl.physicalLayer.topLevelOperators.POLoad;
 import org.apache.pig.impl.physicalLayer.topLevelOperators.POLocalRearrange;
 import org.apache.pig.impl.physicalLayer.topLevelOperators.POPackage;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POSort;
 import org.apache.pig.impl.physicalLayer.topLevelOperators.POSplit;
 import org.apache.pig.impl.physicalLayer.topLevelOperators.POStore;
 import org.apache.pig.impl.physicalLayer.topLevelOperators.POUnion;
 import org.apache.pig.impl.physicalLayer.topLevelOperators.PhysicalOperator;
-import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.POProject;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.POUserComparisonFunc;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.POUserFunc;
 import org.apache.pig.impl.plan.PlanException;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.test.TestPOSort.WeirdComparator;
 import org.apache.pig.test.utils.GenPhyOp;
 import org.junit.After;
 import org.junit.Before;
@@ -54,7 +68,7 @@
 public class TestMRCompiler extends junit.framework.TestCase {
     static PhysicalPlan<PhysicalOperator> php = new PhysicalPlan<PhysicalOperator>();
 
-    MiniCluster cluster = MiniCluster.buildCluster();
+//    MiniCluster cluster = MiniCluster.buildCluster();
     
     static PigContext pc;
 
@@ -64,6 +78,7 @@
 
     static final long SEED = 1013;
     
+    static Random r;
     static{
         pc = new PigContext();
         try {
@@ -72,14 +87,16 @@
             // TODO Auto-generated catch block
             e.printStackTrace();
         }
+        r = new Random(SEED);
     }
 
     @Before
     public void setUp() throws ExecException {
-        GenPhyOp.setR(new Random(SEED));
+        GenPhyOp.setR(r);
         
         GenPhyOp.setPc(pc);
-        int numTests = 14;
+        int numTests = 15;
+//        int numTests = 9;
         tests = new String[numTests];
         int cnt = -1;
 
@@ -89,6 +106,7 @@
         tests[++cnt] = "intTestRun2";
         for (int i = 1; i <= 3; i++)
             tests[++cnt] = "intTestSpl" + i;
+        tests[++cnt] = "intTestSortUDF1";
     }
 
     @After
@@ -112,7 +130,9 @@
         PhysicalPlan<PhysicalOperator> part1 = new PhysicalPlan<PhysicalOperator>();
         POLoad lC = GenPhyOp.topLoadOp();
         POFilter fC = GenPhyOp.topFilterOp();
+        fC.setRequestedParallelism(20);
         POLocalRearrange lrC = GenPhyOp.topLocalRearrangeOp();
+        lrC.setRequestedParallelism(10);
         POGlobalRearrange grC = GenPhyOp.topGlobalRearrangeOp();
         POPackage pkC = GenPhyOp.topPackageOp();
         part1.add(lC);
@@ -126,7 +146,9 @@
         part1.connect(grC, pkC);
 
         POPackage pkD = GenPhyOp.topPackageOp();
+        pkD.setRequestedParallelism(20);
         POLocalRearrange lrD = GenPhyOp.topLocalRearrangeOp();
+        lrD.setRequestedParallelism(30);
         POGlobalRearrange grD = GenPhyOp.topGlobalRearrangeOp();
         POLoad lD = GenPhyOp.topLoadOp();
         part1.add(lD);
@@ -399,7 +421,9 @@
         php.connect(lA, spl);
 
         POFilter fl1 = GenPhyOp.topFilterOp();
+        fl1.setRequestedParallelism(10);
         POFilter fl2 = GenPhyOp.topFilterOp();
+        fl2.setRequestedParallelism(20);
         php.add(fl1);
         php.add(fl2);
         php.connect(spl, fl1);
@@ -413,8 +437,11 @@
         php.connect(fl2, sp21);
 
         POFilter fl11 = GenPhyOp.topFilterOp();
+        fl11.setRequestedParallelism(10);
         POFilter fl21 = GenPhyOp.topFilterOp();
+        fl21.setRequestedParallelism(20);
         POFilter fl22 = GenPhyOp.topFilterOp();
+        fl22.setRequestedParallelism(30);
         php.add(fl11);
         php.add(fl21);
         php.add(fl22);
@@ -423,8 +450,11 @@
         php.connect(sp21, fl22);
 
         POLocalRearrange lr1 = GenPhyOp.topLocalRearrangeOp();
+        lr1.setRequestedParallelism(40);
         POLocalRearrange lr21 = GenPhyOp.topLocalRearrangeOp();
+        lr21.setRequestedParallelism(15);
         POLocalRearrange lr22 = GenPhyOp.topLocalRearrangeOp();
+        lr22.setRequestedParallelism(35);
         php.add(lr1);
         php.add(lr21);
         php.add(lr22);
@@ -436,13 +466,16 @@
         php.addAsLeaf(gr);
 
         POPackage pk = GenPhyOp.topPackageOp();
+        pk.setRequestedParallelism(25);
         php.addAsLeaf(pk);
 
         POSplit sp2 = GenPhyOp.topSplitOp();
         php.addAsLeaf(sp2);
 
         POFilter fl3 = GenPhyOp.topFilterOp();
+        fl3.setRequestedParallelism(100);
         POFilter fl4 = GenPhyOp.topFilterOp();
+        fl4.setRequestedParallelism(80);
         php.add(fl3);
         php.add(fl4);
         php.connect(sp2, fl3);
@@ -687,6 +720,86 @@
         POStore st = GenPhyOp.topStoreOp();
         php.addAsLeaf(st);
     }
+    
+    public static void intTestSortUDF1() throws PlanException, ExecException{
+        php = new PhysicalPlan<PhysicalOperator>();
+        PhysicalPlan<PhysicalOperator> ldFil1 = GenPhyOp.loadedFilter();
+        php.merge(ldFil1);
+        
+        String funcName = WeirdComparator.class.getName();
+        POUserFunc comparator = new POUserComparisonFunc(
+                new OperatorKey("", r.nextLong()), -1, null, funcName);
+        POSort sort = new POSort(new OperatorKey("", r.nextLong()), -1, ldFil1.getLeaves(),
+                null, null, comparator);
+        sort.setRequestedParallelism(20);
+        ExprPlan nesSortPlan = new ExprPlan();
+        POProject topPrj = new POProject(new OperatorKey("", r.nextLong()));
+        topPrj.setColumn(1);
+        topPrj.setOverloaded(true);
+        topPrj.setResultType(DataType.TUPLE);
+        nesSortPlan.add(topPrj);
+        
+        POProject prjStar2 = new POProject(new OperatorKey("", r.nextLong()));
+        prjStar2.setResultType(DataType.TUPLE);
+        prjStar2.setStar(true);
+        nesSortPlan.add(prjStar2);
+        
+        nesSortPlan.connect(topPrj, prjStar2);
+        List<ExprPlan> nesSortPlanLst = new ArrayList<ExprPlan>();
+        nesSortPlanLst.add(nesSortPlan);
+        
+        sort.setSortPlans(nesSortPlanLst);
+        
+        php.add(sort);
+        php.connect(ldFil1.getLeaves().get(0), sort);
+        
+        List<String> udfs = new ArrayList<String>();
+        udfs.add(FindQuantiles.class.getName());
+        udfs.add(SUM.class.getName());
+        POForEach fe3 = GenPhyOp.topForEachOPWithUDF(udfs);
+        php.add(fe3);
+        php.connect(sort, fe3);
+        
+        PhysicalPlan<PhysicalOperator> grpChain1 = GenPhyOp.grpChain();
+        php.merge(grpChain1);
+        php.connect(fe3,grpChain1.getRoots().get(0));
+        
+        udfs.clear();
+        udfs.add(AVG.class.getName());
+        POForEach fe4 = GenPhyOp.topForEachOPWithUDF(udfs);
+        php.addAsLeaf(fe4);
+        
+        PhysicalPlan<PhysicalOperator> grpChain2 = GenPhyOp.grpChain();
+        php.merge(grpChain2);
+        php.connect(fe4,grpChain2.getRoots().get(0));
+
+        udfs.clear();
+        udfs.add(GFCross.class.getName());
+        POForEach fe5 = GenPhyOp.topForEachOPWithUDF(udfs);
+        php.addAsLeaf(fe5);
+        
+        POStore st = GenPhyOp.topStoreOp();
+        php.addAsLeaf(st);
+    }
+    
+    public static class WeirdComparator extends ComparisonFunc {
+
+        @Override
+        public int compare(Tuple t1, Tuple t2) {
+            // TODO Auto-generated method stub
+            int result = 0;
+            try {
+                int i1 = (Integer) t1.get(1);
+                int i2 = (Integer) t2.get(1);
+                result = (i1 - 50) * (i1 - 50) - (i2 - 50) * (i2 - 50);
+            } catch (ExecException e) {
+                // TODO Auto-generated catch block
+                e.printStackTrace();
+            }
+            return result;
+        }
+
+    }
 
     public static void generate() throws SecurityException, NoSuchMethodException,
             IllegalArgumentException, IllegalAccessException,
@@ -713,7 +826,7 @@
 
             ppp.print(baos);
 
-            FileOutputStream fos = new FileOutputStream("intTest/org/apache/pig/test/data/GoldenFiles/MRC"
+            FileOutputStream fos = new FileOutputStream("test/org/apache/pig/test/data/GoldenFiles/MRC"
                     + (i + 1) + ".gld");
             fos.write(baos.toByteArray());
 

Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestPOSort.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestPOSort.java?rev=656913&r1=656912&r2=656913&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestPOSort.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestPOSort.java Thu May 15 18:09:30 2008
@@ -34,191 +34,191 @@
 import org.apache.pig.impl.physicalLayer.plans.ExprPlan;
 import org.apache.pig.impl.physicalLayer.topLevelOperators.PORead;
 import org.apache.pig.impl.physicalLayer.topLevelOperators.POSort;
-import org.apache.pig.impl.physicalLayer.topLevelOperators.POUserComparisonFunc;
-import org.apache.pig.impl.physicalLayer.topLevelOperators.POUserFunc;
 import org.apache.pig.impl.physicalLayer.topLevelOperators.PhysicalOperator;
 import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.POProject;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.POUserComparisonFunc;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.POUserFunc;
 import org.apache.pig.test.utils.GenRandomData;
 import org.junit.Test;
 
 public class TestPOSort extends TestCase {
-    Random r = new Random();
-    int MAX_TUPLES = 10;
+	Random r = new Random();
+	int MAX_TUPLES = 10;
 
-    @Test
-    public void testPOSortAscString() throws ExecException {
-        DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBag(r,
-                MAX_TUPLES, 100);
-        List<ExprPlan> sortPlans = new LinkedList<ExprPlan>();
-        POProject pr1 = new POProject(new OperatorKey("", r.nextLong()), -1, 0);
-        pr1.setResultType(DataType.CHARARRAY);
-        ExprPlan expPlan = new ExprPlan();
-        expPlan.add(pr1);
-        sortPlans.add(expPlan);
-        List<Boolean> mAscCols = new LinkedList<Boolean>();
-        mAscCols.add(true);
-        PORead read = new PORead(new OperatorKey("", r.nextLong()), input);
-        List<PhysicalOperator> inputs = new LinkedList<PhysicalOperator>();
-        inputs.add(read);
-        POSort sort = new POSort(new OperatorKey("", r.nextLong()), -1, inputs,
-                sortPlans, mAscCols, null);
-        Tuple t = null;
-        Result res1 = sort.getNext(t);
-        // System.out.println(res1.result);
-        Result res2 = sort.getNext(t);
-        while (res2.returnStatus != POStatus.STATUS_EOP) {
-            Object i1 = ((Tuple) res1.result).get(0);
-            Object i2 = ((Tuple) res2.result).get(0);
-            int i = DataType.compare(i1, i2);
-            // System.out.println(res2.result + " i = " + i);
-            assertEquals(true, (i <= 0));
-            res1 = res2;
-            res2 = sort.getNext(t);
-        }
-    }
-
-    @Test
-    public void testPOSortDescString() throws ExecException {
-        DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBag(r,
-                MAX_TUPLES, 100);
-        List<ExprPlan> sortPlans = new LinkedList<ExprPlan>();
-        POProject pr1 = new POProject(new OperatorKey("", r.nextLong()), -1, 0);
-        pr1.setResultType(DataType.CHARARRAY);
-        ExprPlan expPlan = new ExprPlan();
-        expPlan.add(pr1);
-        sortPlans.add(expPlan);
-        List<Boolean> mAscCols = new LinkedList<Boolean>();
-        mAscCols.add(false);
-        PORead read = new PORead(new OperatorKey("", r.nextLong()), input);
-        List<PhysicalOperator> inputs = new LinkedList<PhysicalOperator>();
-        inputs.add(read);
-        POSort sort = new POSort(new OperatorKey("", r.nextLong()), -1, inputs,
-                sortPlans, mAscCols, null);
-        Tuple t = null;
-        Result res1 = sort.getNext(t);
-        // System.out.println(res1.result);
-        Result res2 = sort.getNext(t);
-        while (res2.returnStatus != POStatus.STATUS_EOP) {
-            Object i1 = ((Tuple) res1.result).get(0);
-            Object i2 = ((Tuple) res2.result).get(0);
-            int i = DataType.compare(i1, i2);
-            // System.out.println(res2.result + " i = " + i);
-            assertEquals(true, (i >= 0));
-            res1 = res2;
-            res2 = sort.getNext(t);
-        }
-    }
-
-    @Test
-    public void testPOSortAsc() throws ExecException {
-        DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBag(r,
-                MAX_TUPLES, 100);
-        List<ExprPlan> sortPlans = new LinkedList<ExprPlan>();
-        POProject pr1 = new POProject(new OperatorKey("", r.nextLong()), -1, 1);
-        pr1.setResultType(DataType.INTEGER);
-        ExprPlan expPlan = new ExprPlan();
-        expPlan.add(pr1);
-        sortPlans.add(expPlan);
-        List<Boolean> mAscCols = new LinkedList<Boolean>();
-        mAscCols.add(true);
-        PORead read = new PORead(new OperatorKey("", r.nextLong()), input);
-        List<PhysicalOperator> inputs = new LinkedList<PhysicalOperator>();
-        inputs.add(read);
-        POSort sort = new POSort(new OperatorKey("", r.nextLong()), -1, inputs,
-                sortPlans, mAscCols, null);
-        Tuple t = null;
-        Result res1 = sort.getNext(t);
-        // System.out.println(res1.result);
-        Result res2 = sort.getNext(t);
-        while (res2.returnStatus != POStatus.STATUS_EOP) {
-            Object i1 = ((Tuple) res1.result).get(1);
-            Object i2 = ((Tuple) res2.result).get(1);
-            int i = DataType.compare(i1, i2);
-            assertEquals(true, (i <= 0));
-            // System.out.println(res2.result);
-            res1 = res2;
-            res2 = sort.getNext(t);
-        }
-    }
-
-    @Test
-    public void testPOSortDesc() throws ExecException {
-        DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBag(r,
-                MAX_TUPLES, 100);
-        List<ExprPlan> sortPlans = new LinkedList<ExprPlan>();
-        POProject pr1 = new POProject(new OperatorKey("", r.nextLong()), -1, 1);
-        pr1.setResultType(DataType.INTEGER);
-        ExprPlan expPlan = new ExprPlan();
-        expPlan.add(pr1);
-        sortPlans.add(expPlan);
-        List<Boolean> mAscCols = new LinkedList<Boolean>();
-        mAscCols.add(false);
-        PORead read = new PORead(new OperatorKey("", r.nextLong()), input);
-        List<PhysicalOperator> inputs = new LinkedList<PhysicalOperator>();
-        inputs.add(read);
-        POSort sort = new POSort(new OperatorKey("", r.nextLong()), -1, inputs,
-                sortPlans, mAscCols, null);
-        Tuple t = null;
-        Result res1 = sort.getNext(t);
-        // System.out.println(res1.result);
-        Result res2 = sort.getNext(t);
-        while (res2.returnStatus != POStatus.STATUS_EOP) {
-            Object i1 = ((Tuple) res1.result).get(1);
-            Object i2 = ((Tuple) res2.result).get(1);
-            int i = DataType.compare(i1, i2);
-            assertEquals(true, (i >= 0));
-            // System.out.println(res2.result);
-            res1 = res2;
-            res2 = sort.getNext(t);
-        }
-    }
-
-    @Test
-    public void testPOSortUDF() throws ExecException {
-        DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBag(r,
-                MAX_TUPLES, 100);
-        PORead read = new PORead(new OperatorKey("", r.nextLong()), input);
-        List<PhysicalOperator> inputs = new LinkedList<PhysicalOperator>();
-        inputs.add(read);
-        String funcName = WeirdComparator.class.getName() + "()";
-        /*POUserFunc comparator = new POUserFunc(
-                new OperatorKey("", r.nextLong()), -1, inputs, funcName);*/
-        POUserFunc comparator = new POUserComparisonFunc(
-                new OperatorKey("", r.nextLong()), -1, null, funcName);
-        POSort sort = new POSort(new OperatorKey("", r.nextLong()), -1, inputs,
-                null, null, comparator);
-        Tuple t = null;
-        Result res1 = sort.getNext(t);
-        // System.out.println(res1.result);
-        Result res2 = sort.getNext(t);
-        while (res2.returnStatus != POStatus.STATUS_EOP) {
-            int i1 = (Integer) ((Tuple) res1.result).get(1);
-            int i2 = (Integer) ((Tuple) res2.result).get(1);
-            int i = (i1 - 50) * (i1 - 50) - (i2 - 50) * (i2 - 50);
-            assertEquals(true, (i <= 0));
-            System.out.println(i + " : " + res2.result);
-            res1 = res2;
-            res2 = sort.getNext(t);
-        }
-    }
-
-    // sorts values in ascending order of their distance from 50
-    public static class WeirdComparator extends ComparisonFunc {
-
-        @Override
-        public int compare(Tuple t1, Tuple t2) {
-            // TODO Auto-generated method stub
-            int result = 0;
-            try {
-                int i1 = (Integer) t1.get(1);
-                int i2 = (Integer) t2.get(1);
-                result = (i1 - 50) * (i1 - 50) - (i2 - 50) * (i2 - 50);
-            } catch (ExecException e) {
-                // TODO Auto-generated catch block
-                e.printStackTrace();
-            }
-            return result;
-        }
+	@Test
+	public void testPOSortAscString() throws ExecException {
+		DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBag(r,
+				MAX_TUPLES, 100);
+		List<ExprPlan> sortPlans = new LinkedList<ExprPlan>();
+		POProject pr1 = new POProject(new OperatorKey("", r.nextLong()), -1, 0);
+		pr1.setResultType(DataType.CHARARRAY);
+		ExprPlan expPlan = new ExprPlan();
+		expPlan.add(pr1);
+		sortPlans.add(expPlan);
+		List<Boolean> mAscCols = new LinkedList<Boolean>();
+		mAscCols.add(true);
+		PORead read = new PORead(new OperatorKey("", r.nextLong()), input);
+		List<PhysicalOperator> inputs = new LinkedList<PhysicalOperator>();
+		inputs.add(read);
+		POSort sort = new POSort(new OperatorKey("", r.nextLong()), -1, inputs,
+				sortPlans, mAscCols, null);
+		Tuple t = null;
+		Result res1 = sort.getNext(t);
+		// System.out.println(res1.result);
+		Result res2 = sort.getNext(t);
+		while (res2.returnStatus != POStatus.STATUS_EOP) {
+			Object i1 = ((Tuple) res1.result).get(0);
+			Object i2 = ((Tuple) res2.result).get(0);
+			int i = DataType.compare(i1, i2);
+			// System.out.println(res2.result + " i = " + i);
+			assertEquals(true, (i <= 0));
+			res1 = res2;
+			res2 = sort.getNext(t);
+		}
+	}
+
+	@Test
+	public void testPOSortDescString() throws ExecException {
+		DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBag(r,
+				MAX_TUPLES, 100);
+		List<ExprPlan> sortPlans = new LinkedList<ExprPlan>();
+		POProject pr1 = new POProject(new OperatorKey("", r.nextLong()), -1, 0);
+		pr1.setResultType(DataType.CHARARRAY);
+		ExprPlan expPlan = new ExprPlan();
+		expPlan.add(pr1);
+		sortPlans.add(expPlan);
+		List<Boolean> mAscCols = new LinkedList<Boolean>();
+		mAscCols.add(false);
+		PORead read = new PORead(new OperatorKey("", r.nextLong()), input);
+		List<PhysicalOperator> inputs = new LinkedList<PhysicalOperator>();
+		inputs.add(read);
+		POSort sort = new POSort(new OperatorKey("", r.nextLong()), -1, inputs,
+				sortPlans, mAscCols, null);
+		Tuple t = null;
+		Result res1 = sort.getNext(t);
+		// System.out.println(res1.result);
+		Result res2 = sort.getNext(t);
+		while (res2.returnStatus != POStatus.STATUS_EOP) {
+			Object i1 = ((Tuple) res1.result).get(0);
+			Object i2 = ((Tuple) res2.result).get(0);
+			int i = DataType.compare(i1, i2);
+			// System.out.println(res2.result + " i = " + i);
+			assertEquals(true, (i >= 0));
+			res1 = res2;
+			res2 = sort.getNext(t);
+		}
+	}
+
+	@Test
+	public void testPOSortAsc() throws ExecException {
+		DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBag(r,
+				MAX_TUPLES, 100);
+		List<ExprPlan> sortPlans = new LinkedList<ExprPlan>();
+		POProject pr1 = new POProject(new OperatorKey("", r.nextLong()), -1, 1);
+		pr1.setResultType(DataType.INTEGER);
+		ExprPlan expPlan = new ExprPlan();
+		expPlan.add(pr1);
+		sortPlans.add(expPlan);
+		List<Boolean> mAscCols = new LinkedList<Boolean>();
+		mAscCols.add(true);
+		PORead read = new PORead(new OperatorKey("", r.nextLong()), input);
+		List<PhysicalOperator> inputs = new LinkedList<PhysicalOperator>();
+		inputs.add(read);
+		POSort sort = new POSort(new OperatorKey("", r.nextLong()), -1, inputs,
+				sortPlans, mAscCols, null);
+		Tuple t = null;
+		Result res1 = sort.getNext(t);
+		// System.out.println(res1.result);
+		Result res2 = sort.getNext(t);
+		while (res2.returnStatus != POStatus.STATUS_EOP) {
+			Object i1 = ((Tuple) res1.result).get(1);
+			Object i2 = ((Tuple) res2.result).get(1);
+			int i = DataType.compare(i1, i2);
+			assertEquals(true, (i <= 0));
+			// System.out.println(res2.result);
+			res1 = res2;
+			res2 = sort.getNext(t);
+		}
+	}
+
+	@Test
+	public void testPOSortDesc() throws ExecException {
+		DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBag(r,
+				MAX_TUPLES, 100);
+		List<ExprPlan> sortPlans = new LinkedList<ExprPlan>();
+		POProject pr1 = new POProject(new OperatorKey("", r.nextLong()), -1, 1);
+		pr1.setResultType(DataType.INTEGER);
+		ExprPlan expPlan = new ExprPlan();
+		expPlan.add(pr1);
+		sortPlans.add(expPlan);
+		List<Boolean> mAscCols = new LinkedList<Boolean>();
+		mAscCols.add(false);
+		PORead read = new PORead(new OperatorKey("", r.nextLong()), input);
+		List<PhysicalOperator> inputs = new LinkedList<PhysicalOperator>();
+		inputs.add(read);
+		POSort sort = new POSort(new OperatorKey("", r.nextLong()), -1, inputs,
+				sortPlans, mAscCols, null);
+		Tuple t = null;
+		Result res1 = sort.getNext(t);
+		// System.out.println(res1.result);
+		Result res2 = sort.getNext(t);
+		while (res2.returnStatus != POStatus.STATUS_EOP) {
+			Object i1 = ((Tuple) res1.result).get(1);
+			Object i2 = ((Tuple) res2.result).get(1);
+			int i = DataType.compare(i1, i2);
+			assertEquals(true, (i >= 0));
+			// System.out.println(res2.result);
+			res1 = res2;
+			res2 = sort.getNext(t);
+		}
+	}
+
+	@Test
+	public void testPOSortUDF() throws ExecException {
+		DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBag(r,
+				MAX_TUPLES, 100);
+		PORead read = new PORead(new OperatorKey("", r.nextLong()), input);
+		List<PhysicalOperator> inputs = new LinkedList<PhysicalOperator>();
+		inputs.add(read);
+		String funcName = WeirdComparator.class.getName() + "()";
+		/*POUserFunc comparator = new POUserFunc(
+				new OperatorKey("", r.nextLong()), -1, inputs, funcName);*/
+		POUserFunc comparator = new POUserComparisonFunc(
+				new OperatorKey("", r.nextLong()), -1, null, funcName);
+		POSort sort = new POSort(new OperatorKey("", r.nextLong()), -1, inputs,
+				null, null, comparator);
+		Tuple t = null;
+		Result res1 = sort.getNext(t);
+		// System.out.println(res1.result);
+		Result res2 = sort.getNext(t);
+		while (res2.returnStatus != POStatus.STATUS_EOP) {
+			int i1 = (Integer) ((Tuple) res1.result).get(1);
+			int i2 = (Integer) ((Tuple) res2.result).get(1);
+			int i = (i1 - 50) * (i1 - 50) - (i2 - 50) * (i2 - 50);
+			assertEquals(true, (i <= 0));
+			System.out.println(i + " : " + res2.result);
+			res1 = res2;
+			res2 = sort.getNext(t);
+		}
+	}
+
+	// sorts values in ascending order of their distance from 50
+	public static class WeirdComparator extends ComparisonFunc {
+
+		@Override
+		public int compare(Tuple t1, Tuple t2) {
+			// TODO Auto-generated method stub
+			int result = 0;
+			try {
+				int i1 = (Integer) t1.get(1);
+				int i2 = (Integer) t2.get(1);
+				result = (i1 - 50) * (i1 - 50) - (i2 - 50) * (i2 - 50);
+			} catch (ExecException e) {
+				// TODO Auto-generated catch block
+				e.printStackTrace();
+			}
+			return result;
+		}
 
-    }
+	}
 }