You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pi...@apache.org on 2008/07/10 15:01:20 UTC
svn commit: r675558 - in /incubator/pig/branches/types:
src/org/apache/pig/impl/physicalLayer/relationalOperators/POSort.java
test/org/apache/pig/test/TestPOSort.java
Author: pisong
Date: Thu Jul 10 06:01:19 2008
New Revision: 675558
URL: http://svn.apache.org/viewvc?rev=675558&view=rev
Log:
PIG-298 POSort Logic
Modified:
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POSort.java
incubator/pig/branches/types/test/org/apache/pig/test/TestPOSort.java
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POSort.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POSort.java?rev=675558&r1=675557&r2=675558&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POSort.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POSort.java Thu Jul 10 06:01:19 2008
@@ -137,16 +137,27 @@
if(res1.returnStatus != POStatus.STATUS_OK || res2.returnStatus != POStatus.STATUS_OK) {
log.error("Error processing the input in the expression plan : " + plan.toString());
} else {
- if(mAscCols.get(count ++))
+ if(mAscCols.get(count++)) {
ret = DataType.compare(res1.result, res2.result);
- else
- ret = DataType.compare(res2.result, res1.result);
+ // If they are not equal, return
+ // Otherwise, keep comparing the next one
+ if (ret != 0) {
+ return ret ;
+ }
+ }
+ else {
+ ret = DataType.compare(res2.result, res1.result);
+ if (ret != 0) {
+ return ret ;
+ }
+
+ }
+
}
} catch (ExecException e) {
log.error("Invalid result while executing the expression plan : " + plan.toString() + "\n" + e.getMessage());
}
-
}
return ret;
}
Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestPOSort.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestPOSort.java?rev=675558&r1=675557&r2=675558&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestPOSort.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestPOSort.java Thu Jul 10 06:01:19 2008
@@ -22,12 +22,11 @@
import java.util.Random;
import junit.framework.TestCase;
+import junit.framework.Assert;
import org.apache.pig.ComparisonFunc;
import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.DataType;
-import org.apache.pig.data.Tuple;
+import org.apache.pig.data.*;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.physicalLayer.PhysicalOperator;
import org.apache.pig.impl.physicalLayer.POStatus;
@@ -173,7 +172,164 @@
}
}
- @Test
+ /***
+ * Sorting
+ * (1, 10)
+ * (3, 8)
+ * (2, 8)
+ *
+ * BY $1 DESC, $0 ASC
+ *
+ * should return
+ * (1, 10)
+ * (2 ,8)
+ * (3, 8)
+ *
+ * @throws ExecException
+ */
+ @Test
+ public void testPOSortMixAscDesc1() throws ExecException {
+ DataBag input = DefaultBagFactory.getInstance().newDefaultBag() ;
+
+ Tuple t1 = DefaultTupleFactory.getInstance().newTuple() ;
+ t1.append(1);
+ t1.append(10);
+ input.add(t1);
+
+ Tuple t2 = DefaultTupleFactory.getInstance().newTuple() ;
+ t2.append(3);
+ t2.append(8);
+ input.add(t2);
+
+
+ Tuple t3 = DefaultTupleFactory.getInstance().newTuple() ;
+ t3.append(2);
+ t3.append(8);
+ input.add(t3);
+
+ List<PhysicalPlan> sortPlans = new LinkedList<PhysicalPlan>();
+
+ POProject pr1 = new POProject(new OperatorKey("", r.nextLong()), -1, 1);
+ pr1.setResultType(DataType.INTEGER);
+ PhysicalPlan expPlan1 = new PhysicalPlan();
+ expPlan1.add(pr1);
+ sortPlans.add(expPlan1);
+
+ POProject pr2 = new POProject(new OperatorKey("", r.nextLong()), -1, 0);
+ pr2.setResultType(DataType.INTEGER);
+ PhysicalPlan expPlan2 = new PhysicalPlan();
+ expPlan2.add(pr2);
+ sortPlans.add(expPlan2);
+
+ List<Boolean> mAscCols = new LinkedList<Boolean>();
+ mAscCols.add(false);
+ mAscCols.add(true);
+
+ PORead read = new PORead(new OperatorKey("", r.nextLong()), input);
+ List<PhysicalOperator> inputs = new LinkedList<PhysicalOperator>();
+ inputs.add(read);
+
+ POSort sort = new POSort(new OperatorKey("", r.nextLong()), -1, inputs,
+ sortPlans, mAscCols, null);
+
+ Tuple t = null;
+ Result res ;
+ // output line 1
+ res = sort.getNext(t);
+ Assert.assertEquals(((Tuple) res.result).get(0), 1) ;
+ Assert.assertEquals(((Tuple) res.result).get(1), 10) ;
+ // output line 2
+ res = sort.getNext(t);
+ Assert.assertEquals(((Tuple) res.result).get(0), 2) ;
+ Assert.assertEquals(((Tuple) res.result).get(1), 8) ;
+ // output line 3
+ res = sort.getNext(t);
+ Assert.assertEquals(((Tuple) res.result).get(0), 3) ;
+ Assert.assertEquals(((Tuple) res.result).get(1), 8) ;
+
+ }
+
+
+ /***
+ * Sorting
+ * (1, 2)
+ * (3, 5)
+ * (3, 8)
+ *
+ * BY $0 DESC, $1 ASC
+ *
+ * should return
+ * (3, 5)
+ * (3 ,8)
+ * (1, 2)
+ *
+ * @throws ExecException
+ */
+
+ @Test
+ public void testPOSortMixAscDesc2() throws ExecException {
+ DataBag input = DefaultBagFactory.getInstance().newDefaultBag() ;
+
+ Tuple t1 = DefaultTupleFactory.getInstance().newTuple() ;
+ t1.append(1);
+ t1.append(2);
+ input.add(t1);
+
+ Tuple t2 = DefaultTupleFactory.getInstance().newTuple() ;
+ t2.append(3);
+ t2.append(5);
+ input.add(t2);
+
+
+ Tuple t3 = DefaultTupleFactory.getInstance().newTuple() ;
+ t3.append(3);
+ t3.append(8);
+ input.add(t3);
+
+ List<PhysicalPlan> sortPlans = new LinkedList<PhysicalPlan>();
+
+ POProject pr1 = new POProject(new OperatorKey("", r.nextLong()), -1, 0);
+ pr1.setResultType(DataType.INTEGER);
+ PhysicalPlan expPlan1 = new PhysicalPlan();
+ expPlan1.add(pr1);
+ sortPlans.add(expPlan1);
+
+ POProject pr2 = new POProject(new OperatorKey("", r.nextLong()), -1, 1);
+ pr2.setResultType(DataType.INTEGER);
+ PhysicalPlan expPlan2 = new PhysicalPlan();
+ expPlan2.add(pr2);
+ sortPlans.add(expPlan2);
+
+ List<Boolean> mAscCols = new LinkedList<Boolean>();
+ mAscCols.add(false);
+ mAscCols.add(true);
+
+ PORead read = new PORead(new OperatorKey("", r.nextLong()), input);
+ List<PhysicalOperator> inputs = new LinkedList<PhysicalOperator>();
+ inputs.add(read);
+
+ POSort sort = new POSort(new OperatorKey("", r.nextLong()), -1, inputs,
+ sortPlans, mAscCols, null);
+
+ Tuple t = null;
+ Result res ;
+ // output line 1
+ res = sort.getNext(t);
+ Assert.assertEquals(((Tuple) res.result).get(0), 3) ;
+ Assert.assertEquals(((Tuple) res.result).get(1), 5) ;
+ // output line 2
+ res = sort.getNext(t);
+ Assert.assertEquals(((Tuple) res.result).get(0), 3) ;
+ Assert.assertEquals(((Tuple) res.result).get(1), 8) ;
+ // output line 3
+ res = sort.getNext(t);
+ Assert.assertEquals(((Tuple) res.result).get(0), 1) ;
+ Assert.assertEquals(((Tuple) res.result).get(1), 2) ;
+
+ }
+
+
+ @Test
public void testPOSortUDF() throws ExecException {
DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBag(r,
MAX_TUPLES, 100);