You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ol...@apache.org on 2008/09/10 20:55:55 UTC
svn commit: r693927 - in /incubator/pig/branches/types: ./
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
test/org/apache/pig/test/
Author: olga
Date: Wed Sep 10 11:55:53 2008
New Revision: 693927
URL: http://svn.apache.org/viewvc?rev=693927&view=rev
Log:
PIG-402, PIG-415: order related bugs
Modified:
incubator/pig/branches/types/CHANGES.txt
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBytesRawComparator.java
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigDoubleRawComparator.java
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigFloatRawComparator.java
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigIntRawComparator.java
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigLongRawComparator.java
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTextRawComparator.java
incubator/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java
Modified: incubator/pig/branches/types/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/CHANGES.txt?rev=693927&r1=693926&r2=693927&view=diff
==============================================================================
--- incubator/pig/branches/types/CHANGES.txt (original)
+++ incubator/pig/branches/types/CHANGES.txt Wed Sep 10 11:55:53 2008
@@ -183,3 +183,9 @@
PIG-398: Expressions not allowed inside foreach (sms via olgan)
PIG-418: divide by 0 problem
+
+ PIG-402: order by with user comparator (shravanmn via olgan)
+
+ PIG-415: problem with comparators (shravanmn via olgan)
+
+
Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=693927&r1=693926&r2=693927&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Wed Sep 10 11:55:53 2008
@@ -287,7 +287,7 @@
op.setParentPlan(plans[i]);
}
}
-
+ POPackage pack = null;
if(mro.reducePlan.isEmpty()){
//MapOnly Job
jobConf.setMapperClass(PigMapOnly.Map.class);
@@ -310,7 +310,7 @@
jobConf.set("pig.combinePlan", ObjectSerializer.serialize(mro.combinePlan));
jobConf.set("pig.combine.package", ObjectSerializer.serialize(combPack));
}
- POPackage pack = (POPackage)mro.reducePlan.getRoots().get(0);
+ pack = (POPackage)mro.reducePlan.getRoots().get(0);
mro.reducePlan.remove(pack);
jobConf.setMapperClass(PigMapReduce.Map.class);
jobConf.setReducerClass(PigMapReduce.Reduce.class);
@@ -345,6 +345,10 @@
String compFuncSpec = mro.UDFs.get(0);
Class comparator = PigContext.resolveClassName(compFuncSpec);
if(ComparisonFunc.class.isAssignableFrom(comparator)) {
+ jobConf.setMapperClass(PigMapReduce.MapWithComparator.class);
+ pack.setKeyType(DataType.TUPLE);
+ jobConf.set("pig.reduce.package", ObjectSerializer.serialize(pack));
+ jobConf.setOutputKeyClass(TupleFactory.getInstance().tupleClass());
jobConf.setOutputKeyComparatorClass(comparator);
}
} else {
Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBytesRawComparator.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBytesRawComparator.java?rev=693927&r1=693926&r2=693927&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBytesRawComparator.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBytesRawComparator.java Wed Sep 10 11:55:53 2008
@@ -66,7 +66,7 @@
// If either are null, handle differently.
if (b1[s1] == NullableBytesWritable.NOTNULL &&
b2[s2] == NullableBytesWritable.NOTNULL) {
- rc = super.compare(b1, s1 + 1, l1, b2, s2 + 1, l2);
+ rc = super.compare(b1, s1 + 1, l1-1, b2, s2 + 1, l2-1);
} else {
// For sorting purposes two nulls are equal.
if (b1[s1] == NullableBytesWritable.NULL &&
Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigDoubleRawComparator.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigDoubleRawComparator.java?rev=693927&r1=693926&r2=693927&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigDoubleRawComparator.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigDoubleRawComparator.java Wed Sep 10 11:55:53 2008
@@ -65,7 +65,7 @@
// If either are null, handle differently.
if (b1[s1] == NullableDoubleWritable.NOTNULL &&
b2[s2] == NullableDoubleWritable.NOTNULL) {
- rc = super.compare(b1, s1 + 1, l1, b2, s2 + 1, l2);
+ rc = super.compare(b1, s1 + 1, l1-1, b2, s2 + 1, l2-1);
} else {
// For sorting purposes two nulls are equal.
if (b1[s1] == NullableDoubleWritable.NULL &&
Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigFloatRawComparator.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigFloatRawComparator.java?rev=693927&r1=693926&r2=693927&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigFloatRawComparator.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigFloatRawComparator.java Wed Sep 10 11:55:53 2008
@@ -66,7 +66,7 @@
// If either are null, handle differently.
if (b1[s1] == NullableFloatWritable.NOTNULL &&
b2[s2] == NullableFloatWritable.NOTNULL) {
- rc = super.compare(b1, s1 + 1, l1, b2, s2 + 1, l2);
+ rc = super.compare(b1, s1 + 1, l1-1, b2, s2 + 1, l2-1);
} else {
// For sorting purposes two nulls are equal.
if (b1[s1] == NullableFloatWritable.NULL &&
Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigIntRawComparator.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigIntRawComparator.java?rev=693927&r1=693926&r2=693927&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigIntRawComparator.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigIntRawComparator.java Wed Sep 10 11:55:53 2008
@@ -66,7 +66,7 @@
// If either are null, handle differently.
if (b1[s1] == NullableIntWritable.NOTNULL &&
b2[s2] == NullableIntWritable.NOTNULL) {
- rc = super.compare(b1, s1 + 1, l1, b2, s2 + 1, l2);
+ rc = super.compare(b1, s1 + 1, l1-1, b2, s2 + 1, l2-1);
} else {
// For sorting purposes two nulls are equal.
if (b1[s1] == NullableIntWritable.NULL &&
Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigLongRawComparator.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigLongRawComparator.java?rev=693927&r1=693926&r2=693927&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigLongRawComparator.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigLongRawComparator.java Wed Sep 10 11:55:53 2008
@@ -66,7 +66,7 @@
// If either are null, handle differently.
if (b1[s1] == NullableLongWritable.NOTNULL &&
b2[s2] == NullableLongWritable.NOTNULL) {
- rc = super.compare(b1, s1 + 1, l1, b2, s2 + 1, l2);
+ rc = super.compare(b1, s1 + 1, l1-1, b2, s2 + 1, l2-1);
} else {
// For sorting purposes two nulls are equal.
if (b1[s1] == NullableLongWritable.NULL &&
Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java?rev=693927&r1=693926&r2=693927&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java Wed Sep 10 11:55:53 2008
@@ -16,6 +16,7 @@
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.TargetedTuple;
import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
@@ -27,12 +28,13 @@
public abstract class PigMapBase extends MapReduceBase{
private final Log log = LogFactory.getLog(getClass());
-
+
protected byte keyType;
//Map Plan
protected PhysicalPlan mp;
+ protected TupleFactory tf = TupleFactory.getInstance();
OutputCollector<WritableComparable, Writable> outputCollector;
Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java?rev=693927&r1=693926&r2=693927&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java Wed Sep 10 11:55:53 2008
@@ -83,6 +83,20 @@
oc.collect(wcKey, it);
}
}
+
+ public static class MapWithComparator extends PigMapBase implements
+ Mapper<Text, TargetedTuple, WritableComparable, Writable> {
+
+ @Override
+ public void collect(OutputCollector<WritableComparable, Writable> oc,
+ Tuple tuple) throws ExecException, IOException {
+ Object key = tuple.get(0);
+ Tuple keyTuple = tf.newTuple(1);
+ keyTuple.set(0, key);
+ IndexedTuple it = (IndexedTuple) tuple.get(1);
+ oc.collect(keyTuple, it);
+ }
+ }
public static class Reduce extends MapReduceBase
implements
Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTextRawComparator.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTextRawComparator.java?rev=693927&r1=693926&r2=693927&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTextRawComparator.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTextRawComparator.java Wed Sep 10 11:55:53 2008
@@ -66,7 +66,7 @@
// If either are null, handle differently.
if (b1[s1] == NullableText.NOTNULL &&
b2[s2] == NullableText.NOTNULL) {
- rc = super.compare(b1, s1 + 1, l1, b2, s2 + 1, l2);
+ rc = super.compare(b1, s1 + 1, l1-1, b2, s2 + 1, l2-1);
} else {
// For sorting purposes two nulls are equal.
if (b1[s1] == NullableText.NULL &&
Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java?rev=693927&r1=693926&r2=693927&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java Wed Sep 10 11:55:53 2008
@@ -34,6 +34,7 @@
import org.junit.Before;
import org.junit.Test;
+import org.apache.pig.ComparisonFunc;
import org.apache.pig.EvalFunc;
import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
@@ -61,6 +62,7 @@
public void setUp() throws Exception{
FileLocalizer.setR(new Random());
pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+// pigServer = new PigServer(ExecType.LOCAL);
}
static public class MyBagFunction extends EvalFunc<DataBag>{
@@ -306,16 +308,29 @@
@Test
public void testSort() throws Exception{
- testSortDistinct(false);
+ testSortDistinct(false, false);
+ }
+
+ @Test
+ public void testSortWithUDF() throws Exception{
+ testSortDistinct(false, true);
}
@Test
public void testDistinct() throws Exception{
- testSortDistinct(true);
+ testSortDistinct(true, false);
}
+
+ public static class TupComp extends ComparisonFunc {
- private void testSortDistinct(boolean eliminateDuplicates) throws Exception{
+ @Override
+ public int compare(Tuple t1, Tuple t2) {
+ return t1.compareTo(t2);
+ }
+ }
+
+ private void testSortDistinct(boolean eliminateDuplicates, boolean useUDF) throws Exception{
int LOOP_SIZE = 1024*16;
File tmpFile = File.createTempFile("test", "txt");
PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
@@ -330,7 +345,10 @@
if (eliminateDuplicates){
pigServer.registerQuery("B = DISTINCT (FOREACH A GENERATE $0) PARALLEL 10;");
}else{
- pigServer.registerQuery("B = ORDER A BY $0 PARALLEL 10;");
+ if(!useUDF)
+ pigServer.registerQuery("B = ORDER A BY $0 PARALLEL 10;");
+ else
+ pigServer.registerQuery("B = ORDER A BY $0 using " + TupComp.class.getName() + ";");
}
pigServer.store("B", tmpOutputFile);
@@ -355,7 +373,7 @@
}
- public void testNestedPlan() throws Exception{
+ /*public void testNestedPlan() throws Exception{
int LOOP_COUNT = 10;
File tmpFile = File.createTempFile("test", "txt");
PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
@@ -464,7 +482,7 @@
++numIdentity;
}
assertEquals(5, numIdentity);
- }
+ }*/
}