You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2015/08/12 01:01:11 UTC
svn commit: r1695396 - in /pig/trunk: ./
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/backend/hadoop/executionengine/tez/
src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/
test/e2e/pig/tests/
Author: rohini
Date: Tue Aug 11 23:01:10 2015
New Revision: 1695396
URL: http://svn.apache.org/r1695396
Log:
PIG-4627: [Pig on Tez] Self join does not handle null values correctly (rohini)
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBigDecimalRawComparator.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBigIntegerRawComparator.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBooleanRawComparator.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBytesRawComparator.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigDateTimeRawComparator.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigDoubleRawComparator.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigFloatRawComparator.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigIntRawComparator.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigLongRawComparator.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTextRawComparator.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTupleDefaultRawComparator.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTupleSortComparator.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java
pig/trunk/test/e2e/pig/tests/multiquery.conf
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1695396&r1=1695395&r2=1695396&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Tue Aug 11 23:01:10 2015
@@ -38,6 +38,8 @@ PIG-4639: Add better parser for Apache H
BUG FIXES
+PIG-4627: [Pig on Tez] Self join does not handle null values correctly (rohini)
+
PIG-4644: PORelationToExprProject.clone() is broken (erwaman via rohini)
PIG-4650: ant mvn-deploy target is broken (daijy)
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBigDecimalRawComparator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBigDecimalRawComparator.java?rev=1695396&r1=1695395&r2=1695396&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBigDecimalRawComparator.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBigDecimalRawComparator.java Tue Aug 11 23:01:10 2015
@@ -25,7 +25,6 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.mapred.JobConf;
import org.apache.pig.backend.hadoop.BigDecimalWritable;
import org.apache.pig.impl.io.NullableBigDecimalWritable;
import org.apache.pig.impl.util.ObjectSerializer;
@@ -74,8 +73,10 @@ public class PigBigDecimalRawComparator
if (b1[s1] == 0 && b2[s2] == 0) {
rc = mWrappedComp.compare(b1, s1 + 1, l1 - 2, b2, s2 + 1, l2 - 2);
} else {
- // For sorting purposes two nulls are equal.
- if (b1[s1] != 0 && b2[s2] != 0) rc = 0;
+ // Two nulls are equal if indices are same
+ if (b1[s1] != 0 && b2[s2] != 0) {
+ rc = b1[s1 + 1] - b2[s2 + 1];
+ }
else if (b1[s1] != 0) rc = -1;
else rc = 1;
}
@@ -93,8 +94,10 @@ public class PigBigDecimalRawComparator
if (!ndw1.isNull() && !ndw2.isNull()) {
rc = ((BigDecimal)ndw1.getValueAsPigType()).compareTo((BigDecimal)ndw2.getValueAsPigType());
} else {
- // For sorting purposes two nulls are equal.
- if (ndw1.isNull() && ndw2.isNull()) rc = 0;
+ // Two nulls are equal if indices are same
+ if (ndw1.isNull() && ndw2.isNull()) {
+ rc = ndw1.getIndex() - ndw2.getIndex();
+ }
else if (ndw1.isNull()) rc = -1;
else rc = 1;
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBigIntegerRawComparator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBigIntegerRawComparator.java?rev=1695396&r1=1695395&r2=1695396&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBigIntegerRawComparator.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBigIntegerRawComparator.java Tue Aug 11 23:01:10 2015
@@ -25,7 +25,6 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.mapred.JobConf;
import org.apache.pig.backend.hadoop.BigIntegerWritable;
import org.apache.pig.impl.io.NullableBigIntegerWritable;
import org.apache.pig.impl.util.ObjectSerializer;
@@ -74,8 +73,10 @@ public class PigBigIntegerRawComparator
if (b1[s1] == 0 && b2[s2] == 0) {
rc = mWrappedComp.compare(b1, s1 + 1, l1 - 2, b2, s2 + 1, l2 - 2);
} else {
- // For sorting purposes two nulls are equal.
- if (b1[s1] != 0 && b2[s2] != 0) rc = 0;
+ // Two nulls are equal if indices are same
+ if (b1[s1] != 0 && b2[s2] != 0) {
+ rc = b1[s1 + 1] - b2[s2 + 1];
+ }
else if (b1[s1] != 0) rc = -1;
else rc = 1;
}
@@ -93,8 +94,10 @@ public class PigBigIntegerRawComparator
if (!ndw1.isNull() && !ndw2.isNull()) {
rc = ((BigInteger)ndw1.getValueAsPigType()).compareTo((BigInteger)ndw2.getValueAsPigType());
} else {
- // For sorting purposes two nulls are equal.
- if (ndw1.isNull() && ndw2.isNull()) rc = 0;
+ // Two nulls are equal if indices are same
+ if (ndw1.isNull() && ndw2.isNull()) {
+ rc = ndw1.getIndex() - ndw2.getIndex();
+ }
else if (ndw1.isNull()) rc = -1;
else rc = 1;
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBooleanRawComparator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBooleanRawComparator.java?rev=1695396&r1=1695395&r2=1695396&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBooleanRawComparator.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBooleanRawComparator.java Tue Aug 11 23:01:10 2015
@@ -25,7 +25,6 @@ import org.apache.hadoop.conf.Configurab
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.mapred.JobConf;
import org.apache.pig.impl.io.NullableBooleanWritable;
import org.apache.pig.impl.util.ObjectSerializer;
@@ -39,6 +38,7 @@ public class PigBooleanRawComparator ext
super(NullableBooleanWritable.class);
mWrappedComp = new BooleanWritable.Comparator();
}
+ @Override
public void setConf(Configuration conf) {
try {
mAsc = (boolean[])ObjectSerializer.deserialize(conf.get(
@@ -54,6 +54,7 @@ public class PigBooleanRawComparator ext
}
}
+ @Override
public Configuration getConf() {
return null;
}
@@ -63,6 +64,7 @@ public class PigBooleanRawComparator ext
* then BooleanWritable.compare() is used. If both are null then the indices
* are compared. Otherwise the null one is defined to be less.
*/
+ @Override
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
int rc = 0;
@@ -72,8 +74,10 @@ public class PigBooleanRawComparator ext
byte byte2 = b2[s2 + 1];
rc = (byte1 < byte2) ? -1 : ((byte1 > byte2) ? 1 : 0);
} else {
- // For sorting purposes two nulls are equal.
- if (b1[s1] != 0 && b2[s2] != 0) rc = 0;
+ // Two nulls are equal if indices are same
+ if (b1[s1] != 0 && b2[s2] != 0) {
+ rc = b1[s1 + 1] - b2[s2 + 1];
+ }
else if (b1[s1] != 0) rc = -1;
else rc = 1;
}
@@ -81,6 +85,7 @@ public class PigBooleanRawComparator ext
return rc;
}
+ @Override
public int compare(Object o1, Object o2) {
NullableBooleanWritable nbw1 = (NullableBooleanWritable)o1;
NullableBooleanWritable nbw2 = (NullableBooleanWritable)o2;
@@ -90,8 +95,10 @@ public class PigBooleanRawComparator ext
if (!nbw1.isNull() && !nbw2.isNull()) {
rc = ((Boolean)nbw1.getValueAsPigType()).compareTo((Boolean)nbw2.getValueAsPigType());
} else {
- // For sorting purposes two nulls are equal.
- if (nbw1.isNull() && nbw2.isNull()) rc = 0;
+ // Two nulls are equal if indices are same
+ if (nbw1.isNull() && nbw2.isNull()) {
+ rc = nbw1.getIndex() - nbw2.getIndex();
+ }
else if (nbw1.isNull()) rc = -1;
else rc = 1;
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBytesRawComparator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBytesRawComparator.java?rev=1695396&r1=1695395&r2=1695396&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBytesRawComparator.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBytesRawComparator.java Tue Aug 11 23:01:10 2015
@@ -24,7 +24,6 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.mapred.JobConf;
import org.apache.pig.data.BinInterSedes;
import org.apache.pig.data.DataType;
import org.apache.pig.impl.io.NullableBytesWritable;
@@ -34,13 +33,14 @@ public class PigBytesRawComparator exten
private final Log mLog = LogFactory.getLog(getClass());
private boolean[] mAsc;
- private WritableComparator mWrappedComp;
+ private BinInterSedes.BinInterSedesTupleRawComparator mWrappedComp;
public PigBytesRawComparator() {
super(NullableBytesWritable.class);
mWrappedComp = new BinInterSedes.BinInterSedesTupleRawComparator();
}
+ @Override
public void setConf(Configuration conf) {
try {
mAsc = (boolean[])ObjectSerializer.deserialize(conf.get(
@@ -57,6 +57,7 @@ public class PigBytesRawComparator exten
((BinInterSedes.BinInterSedesTupleRawComparator)mWrappedComp).setConf(conf);
}
+ @Override
public Configuration getConf() {
return null;
}
@@ -69,6 +70,7 @@ public class PigBytesRawComparator exten
* For non-bytearrays, we use BinInterSedesTupleRawComparator.
* If either is null, null one is defined to be less.
*/
+ @Override
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
int rc = 0;
@@ -125,11 +127,17 @@ public class PigBytesRawComparator exten
// Subtract 2, one for null byte and one for index byte. Also, do not reverse the sign
// of rc when mAsc[0] is false because BinInterSedesTupleRawComparator.compare() already
// takes that into account.
- return mWrappedComp.compare(b1, s1 + 1, l1 - 2, b2, s2 + 1, l2 - 2);
+ rc = mWrappedComp.compare(b1, s1 + 1, l1 - 2, b2, s2 + 1, l2 - 2);
+ // handle PIG-927. If tuples are equal but any field inside tuple is null,
+ // then we do not merge keys if indices are not same
+ if (rc == 0 && mWrappedComp.hasComparedTupleNull())
+ rc = b1[s1 + 1] - b2[s2 + 1];
}
} else {
- // For sorting purposes two nulls are equal.
- if (b1[s1] != 0 && b2[s2] != 0) rc = 0;
+ // Two nulls are equal if indices are same
+ if (b1[s1] != 0 && b2[s2] != 0) {
+ rc = b1[s1 + 1] - b2[s2 + 1];
+ }
else if (b1[s1] != 0) rc = -1;
else rc = 1;
}
@@ -137,6 +145,7 @@ public class PigBytesRawComparator exten
return rc;
}
+ @Override
public int compare(Object o1, Object o2) {
NullableBytesWritable nbw1 = (NullableBytesWritable)o1;
NullableBytesWritable nbw2 = (NullableBytesWritable)o2;
@@ -146,8 +155,10 @@ public class PigBytesRawComparator exten
if (!nbw1.isNull() && !nbw2.isNull()) {
rc = DataType.compare(nbw1.getValueAsPigType(), nbw2.getValueAsPigType());
} else {
- // For sorting purposes two nulls are equal.
- if (nbw1.isNull() && nbw2.isNull()) rc = 0;
+ // Two nulls are equal if indices are same
+ if (nbw1.isNull() && nbw2.isNull()) {
+ rc = nbw1.getIndex() - nbw2.getIndex();
+ }
else if (nbw1.isNull()) rc = -1;
else rc = 1;
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigDateTimeRawComparator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigDateTimeRawComparator.java?rev=1695396&r1=1695395&r2=1695396&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigDateTimeRawComparator.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigDateTimeRawComparator.java Tue Aug 11 23:01:10 2015
@@ -20,17 +20,15 @@ package org.apache.pig.backend.hadoop.ex
import java.io.IOException;
-import org.joda.time.DateTime;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.mapred.JobConf;
import org.apache.pig.backend.hadoop.DateTimeWritable;
import org.apache.pig.impl.io.NullableDateTimeWritable;
import org.apache.pig.impl.util.ObjectSerializer;
+import org.joda.time.DateTime;
public class PigDateTimeRawComparator extends WritableComparator implements
Configurable {
@@ -44,6 +42,7 @@ public class PigDateTimeRawComparator ex
mWrappedComp = new DateTimeWritable.Comparator();
}
+ @Override
public void setConf(Configuration conf) {
try {
mAsc = (boolean[]) ObjectSerializer.deserialize(conf
@@ -59,6 +58,7 @@ public class PigDateTimeRawComparator ex
}
}
+ @Override
public Configuration getConf() {
return null;
}
@@ -68,6 +68,7 @@ public class PigDateTimeRawComparator ex
* IntWritable.compare() is used. If both are null then the indices are
* compared. Otherwise the null one is defined to be less.
*/
+ @Override
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
int rc = 0;
@@ -75,9 +76,10 @@ public class PigDateTimeRawComparator ex
if (b1[s1] == 0 && b2[s2] == 0) {
rc = mWrappedComp.compare(b1, s1 + 1, l1 - 2, b2, s2 + 1, l2 - 2);
} else {
- // For sorting purposes two nulls are equal.
- if (b1[s1] != 0 && b2[s2] != 0)
- rc = 0;
+ // Two nulls are equal if indices are same
+ if (b1[s1] != 0 && b2[s2] != 0) {
+ rc = b1[s1 + 1] - b2[s2 + 1];
+ }
else if (b1[s1] != 0)
rc = -1;
else
@@ -88,6 +90,7 @@ public class PigDateTimeRawComparator ex
return rc;
}
+ @Override
public int compare(Object o1, Object o2) {
NullableDateTimeWritable ndtw1 = (NullableDateTimeWritable) o1;
NullableDateTimeWritable ndtw2 = (NullableDateTimeWritable) o2;
@@ -98,9 +101,10 @@ public class PigDateTimeRawComparator ex
rc = ((DateTime) ndtw1.getValueAsPigType())
.compareTo((DateTime) ndtw2.getValueAsPigType());
} else {
- // For sorting purposes two nulls are equal.
- if (ndtw1.isNull() && ndtw2.isNull())
- rc = 0;
+ // Two nulls are equal if indices are same
+ if (ndtw1.isNull() && ndtw2.isNull()) {
+ rc = ndtw1.getIndex() - ndtw2.getIndex();
+ }
else if (ndtw1.isNull())
rc = -1;
else
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigDoubleRawComparator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigDoubleRawComparator.java?rev=1695396&r1=1695395&r2=1695396&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigDoubleRawComparator.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigDoubleRawComparator.java Tue Aug 11 23:01:10 2015
@@ -21,12 +21,9 @@ import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.mapred.JobConf;
-
import org.apache.pig.backend.hadoop.DoubleWritable;
import org.apache.pig.impl.io.NullableDoubleWritable;
import org.apache.pig.impl.util.ObjectSerializer;
@@ -42,6 +39,7 @@ public class PigDoubleRawComparator exte
mWrappedComp = new DoubleWritable.Comparator();
}
+ @Override
public void setConf(Configuration conf) {
try {
mAsc = (boolean[])ObjectSerializer.deserialize(conf.get(
@@ -57,6 +55,7 @@ public class PigDoubleRawComparator exte
}
}
+ @Override
public Configuration getConf() {
return null;
}
@@ -66,6 +65,7 @@ public class PigDoubleRawComparator exte
* then IntWritable.compare() is used. If both are null then the indices
* are compared. Otherwise the null one is defined to be less.
*/
+ @Override
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
int rc = 0;
@@ -73,8 +73,10 @@ public class PigDoubleRawComparator exte
if (b1[s1] == 0 && b2[s2] == 0) {
rc = mWrappedComp.compare(b1, s1 + 1, l1 - 2, b2, s2 + 1, l2 - 2);
} else {
- // For sorting purposes two nulls are equal.
- if (b1[s1] != 0 && b2[s2] != 0) rc = 0;
+ // Two nulls are equal if indices are same
+ if (b1[s1] != 0 && b2[s2] != 0) {
+ rc = b1[s1 + 1] - b2[s2 + 1];
+ }
else if (b1[s1] != 0) rc = -1;
else rc = 1;
}
@@ -82,6 +84,7 @@ public class PigDoubleRawComparator exte
return rc;
}
+ @Override
public int compare(Object o1, Object o2) {
NullableDoubleWritable ndw1 = (NullableDoubleWritable)o1;
NullableDoubleWritable ndw2 = (NullableDoubleWritable)o2;
@@ -91,8 +94,10 @@ public class PigDoubleRawComparator exte
if (!ndw1.isNull() && !ndw2.isNull()) {
rc = ((Double)ndw1.getValueAsPigType()).compareTo((Double)ndw2.getValueAsPigType());
} else {
- // For sorting purposes two nulls are equal.
- if (ndw1.isNull() && ndw2.isNull()) rc = 0;
+ // Two nulls are equal if indices are same
+ if (ndw1.isNull() && ndw2.isNull()) {
+ rc = ndw1.getIndex() - ndw2.getIndex();
+ }
else if (ndw1.isNull()) rc = -1;
else rc = 1;
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigFloatRawComparator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigFloatRawComparator.java?rev=1695396&r1=1695395&r2=1695396&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigFloatRawComparator.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigFloatRawComparator.java Tue Aug 11 23:01:10 2015
@@ -21,13 +21,10 @@ import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.mapred.JobConf;
-
import org.apache.pig.impl.io.NullableFloatWritable;
import org.apache.pig.impl.util.ObjectSerializer;
@@ -42,6 +39,7 @@ public class PigFloatRawComparator exten
mWrappedComp = new FloatWritable.Comparator();
}
+ @Override
public void setConf(Configuration conf) {
try {
mAsc = (boolean[])ObjectSerializer.deserialize(conf.get(
@@ -57,6 +55,7 @@ public class PigFloatRawComparator exten
}
}
+ @Override
public Configuration getConf() {
return null;
}
@@ -66,6 +65,7 @@ public class PigFloatRawComparator exten
* then IntWritable.compare() is used. If both are null then the indices
* are compared. Otherwise the null one is defined to be less.
*/
+ @Override
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
int rc = 0;
@@ -73,8 +73,10 @@ public class PigFloatRawComparator exten
if (b1[s1] == 0 && b2[s2] == 0) {
rc = mWrappedComp.compare(b1, s1 + 1, l1 - 2, b2, s2 + 1, l2 - 2);
} else {
- // For sorting purposes two nulls are equal.
- if (b1[s1] != 0 && b2[s2] != 0) rc = 0;
+ // Two nulls are equal if indices are same
+ if (b1[s1] != 0 && b2[s2] != 0) {
+ rc = b1[s1 + 1] - b2[s2 + 1];
+ }
else if (b1[s1] != 0) rc = -1;
else rc = 1;
}
@@ -82,6 +84,7 @@ public class PigFloatRawComparator exten
return rc;
}
+ @Override
public int compare(Object o1, Object o2) {
NullableFloatWritable nfw1 = (NullableFloatWritable)o1;
NullableFloatWritable nfw2 = (NullableFloatWritable)o2;
@@ -91,8 +94,10 @@ public class PigFloatRawComparator exten
if (!nfw1.isNull() && !nfw2.isNull()) {
rc = ((Float)nfw1.getValueAsPigType()).compareTo((Float)nfw2.getValueAsPigType());
} else {
- // For sorting purposes two nulls are equal.
- if (nfw1.isNull() && nfw2.isNull()) rc = 0;
+ // Two nulls are equal if indices are same
+ if (nfw1.isNull() && nfw2.isNull()) {
+ rc = nfw1.getIndex() - nfw2.getIndex();
+ }
else if (nfw1.isNull()) rc = -1;
else rc = 1;
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigIntRawComparator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigIntRawComparator.java?rev=1695396&r1=1695395&r2=1695396&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigIntRawComparator.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigIntRawComparator.java Tue Aug 11 23:01:10 2015
@@ -21,12 +21,9 @@ import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.mapred.JobConf;
-
import org.apache.pig.impl.io.NullableIntWritable;
import org.apache.pig.impl.util.ObjectSerializer;
@@ -39,6 +36,7 @@ public class PigIntRawComparator extends
super(NullableIntWritable.class);
}
+ @Override
public void setConf(Configuration conf) {
try {
mAsc = (boolean[])ObjectSerializer.deserialize(conf.get(
@@ -54,6 +52,7 @@ public class PigIntRawComparator extends
}
}
+ @Override
public Configuration getConf() {
return null;
}
@@ -63,6 +62,7 @@ public class PigIntRawComparator extends
* then IntWritable.compare() is used. If both are null then the indices
* are compared. Otherwise the null one is defined to be less.
*/
+ @Override
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
int rc = 0;
@@ -72,8 +72,10 @@ public class PigIntRawComparator extends
int int2 = readInt(b2, s2 + 1);
rc = (int1 < int2) ? -1 : ((int1 > int2) ? 1 : 0);
} else {
- // For sorting purposes two nulls are equal.
- if (b1[s1] != 0 && b2[s2] != 0) rc = 0;
+ // Two nulls are equal if indices are same
+ if (b1[s1] != 0 && b2[s2] != 0) {
+ rc = b1[s1 + 1] - b2[s2 + 1];
+ }
else if (b1[s1] != 0) rc = -1;
else rc = 1;
}
@@ -81,6 +83,7 @@ public class PigIntRawComparator extends
return rc;
}
+ @Override
public int compare(Object o1, Object o2) {
NullableIntWritable niw1 = (NullableIntWritable)o1;
NullableIntWritable niw2 = (NullableIntWritable)o2;
@@ -90,8 +93,10 @@ public class PigIntRawComparator extends
if (!niw1.isNull() && !niw2.isNull()) {
rc = ((Integer)niw1.getValueAsPigType()).compareTo((Integer)niw2.getValueAsPigType());
} else {
- // For sorting purposes two nulls are equal.
- if (niw1.isNull() && niw2.isNull()) rc = 0;
+ // Two nulls are equal if indices are same
+ if (niw1.isNull() && niw2.isNull()) {
+ rc = niw1.getIndex() - niw2.getIndex();
+ }
else if (niw1.isNull()) rc = -1;
else rc = 1;
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigLongRawComparator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigLongRawComparator.java?rev=1695396&r1=1695395&r2=1695396&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigLongRawComparator.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigLongRawComparator.java Tue Aug 11 23:01:10 2015
@@ -21,22 +21,18 @@ import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.mapred.JobConf;
-
-import org.apache.pig.impl.io.NullableLongWritable;
import org.apache.pig.impl.io.NullableLongWritable;
import org.apache.pig.impl.util.ObjectSerializer;
public class PigLongRawComparator extends WritableComparator implements Configurable {
- private final Log mLog = LogFactory.getLog(getClass());
- private boolean[] mAsc;
- private LongWritable.Comparator mWrappedComp;
+ protected final Log mLog = LogFactory.getLog(getClass());
+ protected boolean[] mAsc;
+ protected LongWritable.Comparator mWrappedComp;
public PigLongRawComparator() {
super(NullableLongWritable.class);
@@ -44,6 +40,7 @@ public class PigLongRawComparator extend
}
+ @Override
public void setConf(Configuration conf) {
try {
mAsc = (boolean[])ObjectSerializer.deserialize(conf.get(
@@ -59,6 +56,7 @@ public class PigLongRawComparator extend
}
}
+ @Override
public Configuration getConf() {
return null;
}
@@ -68,6 +66,7 @@ public class PigLongRawComparator extend
* then IntWritable.compare() is used. If both are null then the indices
* are compared. Otherwise the null one is defined to be less.
*/
+ @Override
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
int rc = 0;
@@ -75,8 +74,10 @@ public class PigLongRawComparator extend
if (b1[s1] == 0 && b2[s2] == 0) {
rc = mWrappedComp.compare(b1, s1 + 1, l1 - 2, b2, s2 + 1, l2 - 2);
} else {
- // For sorting purposes two nulls are equal.
- if (b1[s1] != 0 && b2[s2] != 0) rc = 0;
+ // Two nulls are equal if indices are same
+ if (b1[s1] != 0 && b2[s2] != 0) {
+ rc = b1[s1 + 1] - b2[s2 + 1];
+ }
else if (b1[s1] != 0) rc = -1;
else rc = 1;
}
@@ -84,6 +85,7 @@ public class PigLongRawComparator extend
return rc;
}
+ @Override
public int compare(Object o1, Object o2) {
NullableLongWritable nlw1 = (NullableLongWritable)o1;
NullableLongWritable nlw2 = (NullableLongWritable)o2;
@@ -93,8 +95,10 @@ public class PigLongRawComparator extend
if (!nlw1.isNull() && !nlw2.isNull()) {
rc = ((Long)nlw1.getValueAsPigType()).compareTo((Long)nlw2.getValueAsPigType());
} else {
- // For sorting purposes two nulls are equal.
- if (nlw1.isNull() && nlw2.isNull()) rc = 0;
+ // Two nulls are equal if indices are same
+ if (nlw1.isNull() && nlw2.isNull()) {
+ rc = nlw1.getIndex() - nlw2.getIndex();
+ }
else if (nlw1.isNull()) rc = -1;
else rc = 1;
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTextRawComparator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTextRawComparator.java?rev=1695396&r1=1695395&r2=1695396&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTextRawComparator.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTextRawComparator.java Tue Aug 11 23:01:10 2015
@@ -21,13 +21,10 @@ import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.mapred.JobConf;
-
import org.apache.pig.impl.io.NullableText;
import org.apache.pig.impl.util.ObjectSerializer;
@@ -43,6 +40,7 @@ public class PigTextRawComparator extend
}
+ @Override
public void setConf(Configuration conf) {
try {
mAsc = (boolean[])ObjectSerializer.deserialize(conf.get(
@@ -57,6 +55,7 @@ public class PigTextRawComparator extend
}
}
+ @Override
public Configuration getConf() {
return null;
}
@@ -66,6 +65,7 @@ public class PigTextRawComparator extend
* then IntWritable.compare() is used. If both are null then the indices
* are compared. Otherwise the null one is defined to be less.
*/
+ @Override
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
int rc = 0;
@@ -73,8 +73,10 @@ public class PigTextRawComparator extend
if (b1[s1] == 0 && b2[s2] == 0) {
rc = mWrappedComp.compare(b1, s1 + 1, l1 - 2, b2, s2 + 1, l2 - 2);
} else {
- // For sorting purposes two nulls are equal.
- if (b1[s1] != 0 && b2[s2] != 0) rc = 0;
+ // Two nulls are equal if indices are same
+ if (b1[s1] != 0 && b2[s2] != 0) {
+ rc = b1[s1 + 1] - b2[s2 + 1];
+ }
else if (b1[s1] != 0) rc = -1;
else rc = 1;
}
@@ -82,6 +84,7 @@ public class PigTextRawComparator extend
return rc;
}
+ @Override
public int compare(Object o1, Object o2) {
NullableText nt1 = (NullableText)o1;
NullableText nt2 = (NullableText)o2;
@@ -91,8 +94,10 @@ public class PigTextRawComparator extend
if (!nt1.isNull() && !nt2.isNull()) {
rc = ((String)nt1.getValueAsPigType()).compareTo((String)nt2.getValueAsPigType());
} else {
- // For sorting purposes two nulls are equal.
- if (nt1.isNull() && nt2.isNull()) rc = 0;
+ // Two nulls are equal if indices are same
+ if (nt1.isNull() && nt2.isNull()) {
+ rc = nt1.getIndex() - nt2.getIndex();
+ }
else if (nt1.isNull()) rc = -1;
else rc = 1;
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTupleDefaultRawComparator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTupleDefaultRawComparator.java?rev=1695396&r1=1695395&r2=1695396&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTupleDefaultRawComparator.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTupleDefaultRawComparator.java Tue Aug 11 23:01:10 2015
@@ -25,7 +25,6 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.mapred.JobConf;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.BinInterSedes;
import org.apache.pig.data.DataType;
@@ -46,6 +45,7 @@ public class PigTupleDefaultRawComparato
super(TupleFactory.getInstance().tupleClass());
}
+ @Override
public void setConf(Configuration conf) {
try {
mAsc = (boolean[]) ObjectSerializer.deserialize(conf.get("pig.sortOrder"));
@@ -62,6 +62,7 @@ public class PigTupleDefaultRawComparato
mWholeTuple = (mAsc.length == 1);
}
+ @Override
public Configuration getConf() {
return null;
}
@@ -78,9 +79,9 @@ public class PigTupleDefaultRawComparato
* IntWritable.compare() is used. If both are null then the indices are
* compared. Otherwise the null one is defined to be less.
*/
+ @Override
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
int rc = 0;
- mHasNullField = false;
Tuple t1;
Tuple t2;
@@ -99,9 +100,16 @@ public class PigTupleDefaultRawComparato
rc = compareTuple(t1, t2); //TODO think about how SchemaTuple could speed this up
+ // handle PIG-927. If tuples are equal but any field inside tuple is null,
+ // then we do not merge keys if indices are not same
+ if (rc == 0 && mHasNullField) {
+ rc = ((NullableTuple) t1).getIndex() - ((NullableTuple) t2).getIndex();
+ }
+
return rc;
}
+ @Override
public int compare(Object o1, Object o2) {
NullableTuple nt1 = (NullableTuple) o1;
NullableTuple nt2 = (NullableTuple) o2;
@@ -110,10 +118,16 @@ public class PigTupleDefaultRawComparato
// If either are null, handle differently.
if (!nt1.isNull() && !nt2.isNull()) {
rc = compareTuple((Tuple) nt1.getValueAsPigType(), (Tuple) nt2.getValueAsPigType());
+ // handle PIG-927. If tuples are equal but any field inside tuple is null,
+ // then we do not merge keys if indices are not same
+ if (rc == 0 && mHasNullField) {
+ rc = nt1.getIndex() - nt2.getIndex();
+ }
} else {
- // For sorting purposes two nulls are equal.
- if (nt1.isNull() && nt2.isNull())
- rc = 0;
+ // Two nulls are equal if indices are same
+ if (nt1.isNull() && nt2.isNull()) {
+ rc = nt1.getIndex() - nt2.getIndex();
+ }
else if (nt1.isNull())
rc = -1;
else
@@ -125,6 +139,7 @@ public class PigTupleDefaultRawComparato
}
private int compareTuple(Tuple t1, Tuple t2) {
+ mHasNullField = false;
int sz1 = t1.size();
int sz2 = t2.size();
if (sz2 < sz1) {
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTupleSortComparator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTupleSortComparator.java?rev=1695396&r1=1695395&r2=1695396&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTupleSortComparator.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTupleSortComparator.java Tue Aug 11 23:01:10 2015
@@ -23,9 +23,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.mapred.JobConf;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
@@ -90,18 +88,26 @@ public class PigTupleSortComparator exte
}
/**
- * Compare two NullableTuples as raw bytes. Tuples are compared field-wise. If both are null they are defined equal.
+ * Compare two NullableTuples as raw bytes. Tuples are compared field-wise.
+ * If both are null, then the indices are compared.
* Otherwise the null one is defined to be less.
*/
+ @Override
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
int rc = 0;
if (b1[s1] == 0 && b2[s2] == 0) {
// skip mNull and mIndex
rc = mComparator.compare(b1, s1 + 1, l1 - 2, b2, s2 + 1, l2 - 2);
+ // handle PIG-927. If tuples are equal but any field inside tuple is null,
+ // then we do not merge keys if indices are not same
+ if (rc == 0 && mComparator.hasComparedTupleNull()) {
+ rc = b1[s1 + 1] - b2[s2 + 1];
+ }
} else {
- // for sorting purposes two nulls are equal, null sorts first
- if (b1[s1] != 0 && b2[s2] != 0)
- rc = 0;
+ // Two nulls are equal if indices are same
+ if (b1[s1] != 0 && b2[s2] != 0) {
+ rc = b1[s1 + 1] - b2[s2 + 1];
+ }
else if (b1[s1] != 0)
rc = -1;
else
@@ -112,6 +118,7 @@ public class PigTupleSortComparator exte
return rc;
}
+ @Override
@SuppressWarnings("unchecked")
public int compare(Object o1, Object o2) {
NullableTuple nt1 = (NullableTuple) o1;
@@ -121,10 +128,16 @@ public class PigTupleSortComparator exte
// If either are null, handle differently.
if (!nt1.isNull() && !nt2.isNull()) {
rc = mComparator.compare((Tuple) nt1.getValueAsPigType(), (Tuple) nt2.getValueAsPigType());
+ // handle PIG-927. If tuples are equal but any field inside tuple is null,
+ // then we do not merge keys if indices are not same
+ if (rc == 0 && mComparator.hasComparedTupleNull()) {
+ rc = nt1.getIndex() - nt2.getIndex();
+ }
} else {
- // For sorting purposes two nulls are equal.
- if (nt1.isNull() && nt2.isNull())
- rc = 0;
+ // Two nulls are equal if indices are same
+ if (nt1.isNull() && nt2.isNull()) {
+ rc = nt1.getIndex() - nt2.getIndex();
+ }
else if (nt1.isNull())
rc = -1;
else
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1695396&r1=1695395&r2=1695396&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java Tue Aug 11 23:01:10 2015
@@ -58,18 +58,7 @@ import org.apache.pig.backend.hadoop.dat
import org.apache.pig.backend.hadoop.executionengine.JobCreationException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.InputSizeReducerEstimator;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigGroupingBigDecimalWritableComparator;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigGroupingBigIntegerWritableComparator;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigGroupingBooleanWritableComparator;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigGroupingCharArrayWritableComparator;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigGroupingDBAWritableComparator;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigGroupingDateTimeWritableComparator;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigGroupingDoubleWritableComparator;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigGroupingFloatWritableComparator;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigGroupingIntWritableComparator;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigGroupingLongWritableComparator;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigGroupingPartitionWritableComparator;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigGroupingTupleWritableComparator;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigSecondaryKeyGroupComparator;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PhyPlanSetter;
@@ -739,7 +728,7 @@ public class TezDagBuilder extends TezOp
parallel = -1;
}
Resource resource;
- if (globalConf.get(TezConfiguration.TEZ_TASK_RESOURCE_MEMORY_MB)!=null &&
+ if (globalConf.get(TezConfiguration.TEZ_TASK_RESOURCE_MEMORY_MB)!=null &&
globalConf.get(TezConfiguration.TEZ_TASK_RESOURCE_CPU_VCORES)!=null) {
resource = Resource.newInstance(globalConf.getInt(TezConfiguration.TEZ_TASK_RESOURCE_MEMORY_MB,
TezConfiguration.TEZ_TASK_RESOURCE_MEMORY_MB_DEFAULT),
@@ -992,7 +981,7 @@ public class TezDagBuilder extends TezOp
selectOutputComparator(keyType, conf, tezOp);
}
- private static Class<? extends WritableComparator> comparatorForKeyType(byte keyType, boolean hasOrderBy)
+ private static Class<? extends WritableComparator> getRawComparator(byte keyType)
throws JobCreationException {
switch (keyType) {
@@ -1024,11 +1013,7 @@ public class TezDagBuilder extends TezOp
return PigTextRawComparator.class;
case DataType.BYTEARRAY:
- //if (hasOrderBy) {
- return PigBytesRawComparator.class;
- //} else {
- // return PigDBAWritableComparator.class;
- //}
+ return PigBytesRawComparator.class;
case DataType.MAP:
int errCode = 1068;
@@ -1036,68 +1021,7 @@ public class TezDagBuilder extends TezOp
throw new JobCreationException(msg, errCode, PigException.INPUT);
case DataType.TUPLE:
- //TODO: PigTupleWritableComparator gives wrong results with cogroup in
- //Checkin_2 and few other e2e tests. But MR has PigTupleWritableComparator
- //Investigate the difference later
- //if (hasOrderBy) {
- return PigTupleSortComparator.class;
- //} else {
- // return PigTupleWritableComparator.class;
- //}
-
- case DataType.BAG:
- errCode = 1068;
- msg = "Using Bag as key not supported.";
- throw new JobCreationException(msg, errCode, PigException.INPUT);
-
- default:
- errCode = 2036;
- msg = "Unhandled key type " + DataType.findTypeName(keyType);
- throw new JobCreationException(msg, errCode, PigException.BUG);
- }
- }
-
- private static Class<? extends WritableComparator> getGroupingComparatorForKeyType(byte keyType)
- throws JobCreationException {
-
- switch (keyType) {
- case DataType.BOOLEAN:
- return PigGroupingBooleanWritableComparator.class;
-
- case DataType.INTEGER:
- return PigGroupingIntWritableComparator.class;
-
- case DataType.BIGINTEGER:
- return PigGroupingBigIntegerWritableComparator.class;
-
- case DataType.BIGDECIMAL:
- return PigGroupingBigDecimalWritableComparator.class;
-
- case DataType.LONG:
- return PigGroupingLongWritableComparator.class;
-
- case DataType.FLOAT:
- return PigGroupingFloatWritableComparator.class;
-
- case DataType.DOUBLE:
- return PigGroupingDoubleWritableComparator.class;
-
- case DataType.DATETIME:
- return PigGroupingDateTimeWritableComparator.class;
-
- case DataType.CHARARRAY:
- return PigGroupingCharArrayWritableComparator.class;
-
- case DataType.BYTEARRAY:
- return PigGroupingDBAWritableComparator.class;
-
- case DataType.MAP:
- int errCode = 1068;
- String msg = "Using Map as key not supported.";
- throw new JobCreationException(msg, errCode, PigException.INPUT);
-
- case DataType.TUPLE:
- return PigGroupingTupleWritableComparator.class;
+ return PigTupleSortComparator.class;
case DataType.BAG:
errCode = 1068;
@@ -1127,29 +1051,11 @@ public class TezDagBuilder extends TezOp
PigGroupingPartitionWritableComparator.class.getName());
setGroupingComparator(conf, PigGroupingPartitionWritableComparator.class.getName());
} else {
- boolean hasOrderby = hasOrderby(tezOp);
conf.setClass(
TezRuntimeConfiguration.TEZ_RUNTIME_KEY_COMPARATOR_CLASS,
- comparatorForKeyType(keyType, hasOrderby), RawComparator.class);
- if (!hasOrderby) {
- setGroupingComparator(conf, getGroupingComparatorForKeyType(keyType).getName());
- }
- }
- }
- }
-
- private boolean hasOrderby(TezOperator tezOp) {
- boolean hasOrderBy = tezOp.isGlobalSort() || tezOp.isLimitAfterSort();
- if (!hasOrderBy) {
- // Check if it is a Orderby sampler job
- List<TezOperator> succs = getPlan().getSuccessors(tezOp);
- if (succs != null && succs.size() == 1) {
- if (succs.get(0).isGlobalSort()) {
- hasOrderBy = true;
- }
+ getRawComparator(keyType), RawComparator.class);
}
}
- return hasOrderBy;
}
private void setGroupingComparator(Configuration conf, String comparatorClass) {
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java?rev=1695396&r1=1695395&r2=1695396&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java Tue Aug 11 23:01:10 2015
@@ -93,9 +93,9 @@ public class POShuffleTezLoad extends PO
try {
for (String inputKey : inputKeys) {
LogicalInput input = inputs.get(inputKey);
- // 1) Case of self join/cogroup/cross with Split.
+ // 1) Case of self join/cogroup/cross with Split - numTezInputs < numInputs/inputKeys
// - Same TezInput will contain multiple indexes in case of join
- // 2) data unioned within Split
+ // 2) data unioned within Split - inputKeys > numInputs/numTezInputs
// - Input key will be repeated, but index would be same within a TezInput
if (!this.inputs.contains(input)) {
this.inputs.add(input);
@@ -133,7 +133,6 @@ public class POShuffleTezLoad extends PO
boolean hasData = false;
Object cur = null;
PigNullableWritable min = null;
- int minIndex = -1;
try {
for (int i = 0; i < numTezInputs; i++) {
@@ -143,7 +142,6 @@ public class POShuffleTezLoad extends PO
if (min == null || comparator.compare(min, cur) > 0) {
//Not a deep clone. Writable is referenced.
min = ((PigNullableWritable)cur).clone();
- minIndex = i;
}
}
}
@@ -169,7 +167,6 @@ public class POShuffleTezLoad extends PO
if (isAccumulative()) {
buffer.setCurrentKey(min);
- buffer.setCurrentKeyIndex(minIndex);
for (int i = 0; i < numInputs; i++) {
bags[i] = new AccumulativeBag(buffer, i);
}
@@ -185,8 +182,7 @@ public class POShuffleTezLoad extends PO
if (!finished[i]) {
cur = readers.get(i).getCurrentKey();
// We need to loop in case of Grouping Comparators
- while (comparator.compare(min, cur) == 0
- && (!min.isNull() || (min.isNull() && i == minIndex))) {
+ while (comparator.compare(min, cur) == 0) {
Iterable<Object> vals = readers.get(i).getCurrentValues();
for (Object val : vals) {
NullableTuple nTup = (NullableTuple) val;
@@ -241,7 +237,6 @@ public class POShuffleTezLoad extends PO
private int batchSize;
private List<Tuple>[] bags;
private PigNullableWritable min;
- private int minIndex;
private boolean clearedCurrent = true;
@SuppressWarnings("unchecked")
@@ -262,10 +257,6 @@ public class POShuffleTezLoad extends PO
clearedCurrent = false;
}
- public void setCurrentKeyIndex(int curKeyIndex) {
- this.minIndex = curKeyIndex;
- }
-
@Override
public boolean hasNextBatch() {
Object cur = null;
@@ -273,8 +264,7 @@ public class POShuffleTezLoad extends PO
for (int i = 0; i < numTezInputs; i++) {
if (!finished[i]) {
cur = readers.get(i).getCurrentKey();
- if (comparator.compare(min, cur) == 0
- && (!min.isNull() || (min.isNull() && i == minIndex))) {
+ if (comparator.compare(min, cur) == 0) {
return true;
}
}
@@ -297,8 +287,7 @@ public class POShuffleTezLoad extends PO
if (!finished[i]) {
cur = readers.get(i).getCurrentKey();
int batchCount = 0;
- while (comparator.compare(min, cur) == 0 && (!min.isNull() ||
- min.isNull() && i==minIndex)) {
+ while (comparator.compare(min, cur) == 0) {
Iterator<Object> iter = readers.get(i).getCurrentValues().iterator();
while (iter.hasNext() && batchCount < batchSize) {
NullableTuple nTup = (NullableTuple) iter.next();
@@ -339,8 +328,7 @@ public class POShuffleTezLoad extends PO
for (int i = 0; i < numTezInputs; i++) {
if (!finished[i]) {
cur = readers.get(i).getCurrentKey();
- while (comparator.compare(min, cur) == 0 && (!min.isNull() ||
- min.isNull() && i==minIndex)) {
+ while (comparator.compare(min, cur) == 0) {
finished[i] = !readers.get(i).next();
if (finished[i]) {
break;
Modified: pig/trunk/test/e2e/pig/tests/multiquery.conf
URL: http://svn.apache.org/viewvc/pig/trunk/test/e2e/pig/tests/multiquery.conf?rev=1695396&r1=1695395&r2=1695396&view=diff
==============================================================================
--- pig/trunk/test/e2e/pig/tests/multiquery.conf (original)
+++ pig/trunk/test/e2e/pig/tests/multiquery.conf Tue Aug 11 23:01:10 2015
@@ -572,8 +572,6 @@ $cfg = {
{
'name' => 'MultiQuery_Union',
- 'floatpostprocess' => 1,
- 'delimiter' => ' ',
'tests' => [
{
# Multiple levels of union + join
@@ -639,6 +637,8 @@ store e into ':OUTPATH:';\,
{
# Union + Groupby + Combiner
'num' => 6,
+ 'floatpostprocess' => 1,
+ 'delimiter' => ' ',
'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);
a1 = filter a by gpa >= 3.9;
a2 = filter a by gpa < 2;
@@ -650,6 +650,8 @@ store e into ':OUTPATH:';\,
{
# Union + Groupby + Secondary key partitioner
'num' => 7,
+ 'floatpostprocess' => 1,
+ 'delimiter' => ' ',
'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);
a1 = filter a by gpa >= 3.9;
a2 = filter a by gpa < 2;
@@ -674,13 +676,11 @@ store d into ':OUTPATH:';\,
{
'name' => 'MultiQuery_Self',
- 'floatpostprocess' => 1,
- 'delimiter' => ' ',
'tests' => [
# Self cross
{
'num' => 1,
- 'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);
+ 'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa);
b = filter a by gpa >= 3.9;
c = filter a by gpa <= 0.5;
d = filter a by gpa >= 3.5 and gpa < 3.9;
@@ -693,8 +693,8 @@ store g into ':OUTPATH:.2';\,
{
# Self cogroup
'num' => 2,
- 'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);
-b = filter a by gpa >= 3.9;
+ 'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa);
+b = filter a by gpa >= 3;
c = filter a by gpa < 2;
d = cogroup c by name, b by name;
e = foreach d generate flatten(c), flatten(b);
@@ -703,18 +703,18 @@ store e into ':OUTPATH:';\,
{
# Three way join (two self)
'num' => 3,
- 'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);
-b = filter a by gpa >= 3.9;
+ 'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa);
+b = filter a by gpa >= 3;
c = filter a by gpa < 2;
-d = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions);
+d = load ':INPATH:/singlefile/voternulltab10k' as (name, age, registration, contributions);
e = join b by name, c by name, d by name PARALLEL 2;
store e into ':OUTPATH:';\,
},
{
# Self join replicated
'num' => 4,
- 'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);
-b = filter a by gpa >= 3.9;
+ 'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa);
+b = filter a by gpa >= 3;
c = filter a by gpa < 2;
d = join c by name, b by name using 'replicated';
store d into ':OUTPATH:';\,
@@ -722,12 +722,39 @@ store d into ':OUTPATH:';\,
{
# Self join skewed
'num' => 5,
- 'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);
-b = filter a by gpa >= 3.9;
+ 'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa);
+b = filter a by gpa >= 3;
c = filter a by gpa < 2;
d = join c by name, b by name using 'skewed' PARALLEL 2;
store d into ':OUTPATH:';\,
},
+ {
+ # Self join left outer
+ 'num' => 6,
+ 'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa);
+b = filter a by gpa >= 3;
+c = filter a by gpa < 2;
+d = join c by name left outer, b by name PARALLEL 2;
+store d into ':OUTPATH:';\,
+ },
+ {
+ # Self join right outer
+ 'num' => 7,
+ 'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa);
+b = filter a by gpa >= 3;
+c = filter a by gpa < 2;
+d = join c by name right outer, b by name PARALLEL 2;
+store d into ':OUTPATH:';\,
+ },
+ {
+ # Self join full outer
+ 'num' => 8,
+ 'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name:chararray, age, gpa);
+b = filter a by gpa >= 3;
+c = filter a by gpa < 2;
+d = join c by name full outer, b by name PARALLEL 2;
+store d into ':OUTPATH:';\,
+ }
] # end of tests
},