You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by dv...@apache.org on 2013/09/25 00:54:43 UTC
svn commit: r1526045 - in /pig/branches/branch-0.12: ./
src/org/apache/pig/backend/hadoop/ src/org/apache/pig/data/
src/org/apache/pig/impl/io/ test/org/apache/pig/test/
Author: dvryaboy
Date: Tue Sep 24 22:54:42 2013
New Revision: 1526045
URL: http://svn.apache.org/r1526045
Log:
PIG-3479: Fix BigInt, BigDec, Date serialization. Improve perf of PigNullableWritable deserilization
Modified:
pig/branches/branch-0.12/CHANGES.txt
pig/branches/branch-0.12/src/org/apache/pig/backend/hadoop/BigDecimalWritable.java
pig/branches/branch-0.12/src/org/apache/pig/backend/hadoop/BigIntegerWritable.java
pig/branches/branch-0.12/src/org/apache/pig/backend/hadoop/HDataType.java
pig/branches/branch-0.12/src/org/apache/pig/data/BinInterSedes.java
pig/branches/branch-0.12/src/org/apache/pig/data/BinSedesTuple.java
pig/branches/branch-0.12/src/org/apache/pig/data/DataReaderWriter.java
pig/branches/branch-0.12/src/org/apache/pig/impl/io/NullablePartitionWritable.java
pig/branches/branch-0.12/src/org/apache/pig/impl/io/NullableTuple.java
pig/branches/branch-0.12/src/org/apache/pig/impl/io/PigNullableWritable.java
pig/branches/branch-0.12/test/org/apache/pig/test/TestPackage.java
Modified: pig/branches/branch-0.12/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.12/CHANGES.txt?rev=1526045&r1=1526044&r2=1526045&view=diff
==============================================================================
--- pig/branches/branch-0.12/CHANGES.txt (original)
+++ pig/branches/branch-0.12/CHANGES.txt Tue Sep 24 22:54:42 2013
@@ -30,6 +30,8 @@ PIG-3174: Remove rpm and deb artifacts f
IMPROVEMENTS
+PIG-3479: Fix BigInt, BigDec, Date serialization. Improve perf of PigNullableWritable deserilization (dvryaboy)
+
PIG-3461: Rewrite PartitionFilterOptimizer to make it work for all the cases (aniket486)
PIG-2417: Streaming UDFs - allow users to easily write UDFs in scripting languages with no
Modified: pig/branches/branch-0.12/src/org/apache/pig/backend/hadoop/BigDecimalWritable.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.12/src/org/apache/pig/backend/hadoop/BigDecimalWritable.java?rev=1526045&r1=1526044&r2=1526045&view=diff
==============================================================================
--- pig/branches/branch-0.12/src/org/apache/pig/backend/hadoop/BigDecimalWritable.java (original)
+++ pig/branches/branch-0.12/src/org/apache/pig/backend/hadoop/BigDecimalWritable.java Tue Sep 24 22:54:42 2013
@@ -58,7 +58,7 @@ public class BigDecimalWritable implemen
@Override
public void readFields(DataInput in) throws IOException {
- value = (BigDecimal)bis.readDatum(in, DataType.BIGDECIMAL);
+ value = (BigDecimal)bis.readDatum(in);
}
@Override
@@ -76,6 +76,7 @@ public class BigDecimalWritable implemen
super(BigDecimalWritable.class);
}
+ @Override
public int compare(byte[] b1, int s1, int l1,
byte[] b2, int s2, int l2) {
try {
Modified: pig/branches/branch-0.12/src/org/apache/pig/backend/hadoop/BigIntegerWritable.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.12/src/org/apache/pig/backend/hadoop/BigIntegerWritable.java?rev=1526045&r1=1526044&r2=1526045&view=diff
==============================================================================
--- pig/branches/branch-0.12/src/org/apache/pig/backend/hadoop/BigIntegerWritable.java (original)
+++ pig/branches/branch-0.12/src/org/apache/pig/backend/hadoop/BigIntegerWritable.java Tue Sep 24 22:54:42 2013
@@ -58,7 +58,7 @@ public class BigIntegerWritable implemen
@Override
public void readFields(DataInput in) throws IOException {
- value = (BigInteger)bis.readDatum(in, DataType.BIGINTEGER);
+ value = (BigInteger)bis.readDatum(in);
}
@Override
@@ -76,6 +76,7 @@ public class BigIntegerWritable implemen
super(BigIntegerWritable.class);
}
+ @Override
public int compare(byte[] b1, int s1, int l1,
byte[] b2, int s2, int l2) {
try {
Modified: pig/branches/branch-0.12/src/org/apache/pig/backend/hadoop/HDataType.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.12/src/org/apache/pig/backend/hadoop/HDataType.java?rev=1526045&r1=1526044&r2=1526045&view=diff
==============================================================================
--- pig/branches/branch-0.12/src/org/apache/pig/backend/hadoop/HDataType.java (original)
+++ pig/branches/branch-0.12/src/org/apache/pig/backend/hadoop/HDataType.java Tue Sep 24 22:54:42 2013
@@ -19,6 +19,8 @@ package org.apache.pig.backend.hadoop;
import java.math.BigDecimal;
import java.math.BigInteger;
+import java.util.Arrays;
+import java.util.HashMap;
import java.util.Map;
import org.apache.pig.PigException;
@@ -60,6 +62,30 @@ public class HDataType {
static NullableTuple defTup = new NullableTuple();
static Map<Byte, String> typeToName = null;
+ private static final HashMap<String, Byte> classToTypeMap = new HashMap<String, Byte>();
+ static {
+ classToTypeMap.put("org.apache.pig.impl.io.NullableBag", DataType.BAG);
+ classToTypeMap.put("org.apache.pig.impl.io.NullableBigDecimalWritable", DataType.BIGDECIMAL);
+ classToTypeMap.put("org.apache.pig.impl.io.NullableBigIntegerWritable", DataType.BIGINTEGER);
+ classToTypeMap.put("org.apache.pig.impl.io.NullableBooleanWritable", DataType.BOOLEAN);
+ classToTypeMap.put("org.apache.pig.impl.io.NullableBytesWritable", DataType.BYTEARRAY);
+ classToTypeMap.put("org.apache.pig.impl.io.NullableDateTimeWritable", DataType.DATETIME);
+ classToTypeMap.put("org.apache.pig.impl.io.NullableDoubleWritable", DataType.DOUBLE);
+ classToTypeMap.put("org.apache.pig.impl.io.NullableFloatWritable", DataType.FLOAT);
+ classToTypeMap.put("org.apache.pig.impl.io.NullableIntWritable", DataType.INTEGER);
+ classToTypeMap.put("org.apache.pig.impl.io.NullableLongWritable", DataType.LONG);
+ classToTypeMap.put("org.apache.pig.impl.io.NullableText", DataType.CHARARRAY);
+ classToTypeMap.put("org.apache.pig.impl.io.NullableTuple", DataType.TUPLE);
+ }
+
+ public static PigNullableWritable getWritableComparable(String className) throws ExecException {
+ if (classToTypeMap.containsKey(className)) {
+ return getWritableComparableTypes(null, classToTypeMap.get(className));
+ } else {
+ throw new ExecException("Unable to map " + className + " to known types." + Arrays.toString(classToTypeMap.keySet().toArray()));
+ }
+ }
+
public static PigNullableWritable getWritableComparableTypes(Object o, byte keyType) throws ExecException{
byte newKeyType = keyType;
Modified: pig/branches/branch-0.12/src/org/apache/pig/data/BinInterSedes.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.12/src/org/apache/pig/data/BinInterSedes.java?rev=1526045&r1=1526044&r2=1526045&view=diff
==============================================================================
--- pig/branches/branch-0.12/src/org/apache/pig/data/BinInterSedes.java (original)
+++ pig/branches/branch-0.12/src/org/apache/pig/data/BinInterSedes.java Tue Sep 24 22:54:42 2013
@@ -435,7 +435,7 @@ public class BinInterSedes implements In
}
private Object readBigDecimal(DataInput in) throws IOException {
- return new BigDecimal((String)readDatum(in));
+ return new BigDecimal((String)readDatum(in));
}
private Object readBigInteger(DataInput in) throws IOException {
Modified: pig/branches/branch-0.12/src/org/apache/pig/data/BinSedesTuple.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.12/src/org/apache/pig/data/BinSedesTuple.java?rev=1526045&r1=1526044&r2=1526045&view=diff
==============================================================================
--- pig/branches/branch-0.12/src/org/apache/pig/data/BinSedesTuple.java (original)
+++ pig/branches/branch-0.12/src/org/apache/pig/data/BinSedesTuple.java Tue Sep 24 22:54:42 2013
@@ -34,7 +34,6 @@ public class BinSedesTuple extends Defau
private static final long serialVersionUID = 1L;
private static final InterSedes sedes = InterSedesFactory.getInterSedesInstance();
-
@Override
public void write(DataOutput out) throws IOException {
@@ -47,8 +46,8 @@ public class BinSedesTuple extends Defau
// Clear our fields, in case we're being reused.
mFields.clear();
sedes.addColsToTuple(in, this);
- }
-
+ }
+
/**
@@ -87,7 +86,7 @@ public class BinSedesTuple extends Defau
BinSedesTuple(List<Object> c, int junk) {
super(c, junk);
}
-
+
public static Class<? extends TupleRawComparator> getComparatorClass() {
return InterSedesFactory.getInterSedesInstance().getTupleRawComparatorClass();
}
Modified: pig/branches/branch-0.12/src/org/apache/pig/data/DataReaderWriter.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.12/src/org/apache/pig/data/DataReaderWriter.java?rev=1526045&r1=1526044&r2=1526045&view=diff
==============================================================================
--- pig/branches/branch-0.12/src/org/apache/pig/data/DataReaderWriter.java (original)
+++ pig/branches/branch-0.12/src/org/apache/pig/data/DataReaderWriter.java Tue Sep 24 22:54:42 2013
@@ -303,6 +303,7 @@ public class DataReaderWriter {
out.writeByte(DataType.DATETIME);
out.writeLong(((DateTime)val).getMillis());
out.writeShort(((DateTime)val).getZone().getOffset((DateTime)val) / 60000);
+ break;
case DataType.BYTEARRAY: {
out.writeByte(DataType.BYTEARRAY);
Modified: pig/branches/branch-0.12/src/org/apache/pig/impl/io/NullablePartitionWritable.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.12/src/org/apache/pig/impl/io/NullablePartitionWritable.java?rev=1526045&r1=1526044&r2=1526045&view=diff
==============================================================================
--- pig/branches/branch-0.12/src/org/apache/pig/impl/io/NullablePartitionWritable.java (original)
+++ pig/branches/branch-0.12/src/org/apache/pig/impl/io/NullablePartitionWritable.java Tue Sep 24 22:54:42 2013
@@ -21,6 +21,8 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import org.apache.pig.backend.hadoop.HDataType;
+
/**
* NullablePartitionWritable is an adaptor class around PigNullableWritable that adds a partition
* index to the class.
@@ -30,9 +32,9 @@ public class NullablePartitionWritable e
private PigNullableWritable key;
public NullablePartitionWritable() {
-
+
}
-
+
public NullablePartitionWritable(PigNullableWritable k) {
setKey(k);
}
@@ -53,50 +55,60 @@ public class NullablePartitionWritable e
return partitionIndex;
}
- public int compareTo(Object o) {
+ @Override
+ public int compareTo(Object o) {
return key.compareTo(((NullablePartitionWritable)o).getKey());
}
-
- public void readFields(DataInput in) throws IOException {
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
String c = in.readUTF();
- try{
- key = (PigNullableWritable)Class.forName(c).newInstance();
- }catch(Exception e) {
+ try {
+ key = HDataType.getWritableComparable(c);
+ } catch(Exception e) {
throw new IOException(e);
}
key.readFields(in);
}
- public void write(DataOutput out) throws IOException {
+ @Override
+ public void write(DataOutput out) throws IOException {
out.writeUTF(key.getClass().getName());
key.write(out);
}
- public boolean isNull() {
+ @Override
+ public boolean isNull() {
return key.isNull();
}
- public void setNull(boolean isNull) {
+ @Override
+ public void setNull(boolean isNull) {
key.setNull(isNull);
}
- public byte getIndex() {
+ @Override
+ public byte getIndex() {
return key.getIndex();
}
- public void setIndex(byte index) {
+ @Override
+ public void setIndex(byte index) {
key.setIndex(index);
}
- public Object getValueAsPigType() {
+ @Override
+ public Object getValueAsPigType() {
return key.getValueAsPigType();
}
- public int hashCode() {
+ @Override
+ public int hashCode() {
return key.hashCode();
}
-
- public String toString() {
+
+ @Override
+ public String toString() {
return "Partition: " + partitionIndex + " " + key.toString();
}
}
Modified: pig/branches/branch-0.12/src/org/apache/pig/impl/io/NullableTuple.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.12/src/org/apache/pig/impl/io/NullableTuple.java?rev=1526045&r1=1526044&r2=1526045&view=diff
==============================================================================
--- pig/branches/branch-0.12/src/org/apache/pig/impl/io/NullableTuple.java (original)
+++ pig/branches/branch-0.12/src/org/apache/pig/impl/io/NullableTuple.java Tue Sep 24 22:54:42 2013
@@ -42,6 +42,7 @@ public class NullableTuple extends PigNu
mValue = t;
}
+ @Override
public Object getValueAsPigType() {
return isNull() ? null : (Tuple)mValue;
}
Modified: pig/branches/branch-0.12/src/org/apache/pig/impl/io/PigNullableWritable.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.12/src/org/apache/pig/impl/io/PigNullableWritable.java?rev=1526045&r1=1526044&r2=1526045&view=diff
==============================================================================
--- pig/branches/branch-0.12/src/org/apache/pig/impl/io/PigNullableWritable.java (original)
+++ pig/branches/branch-0.12/src/org/apache/pig/impl/io/PigNullableWritable.java Tue Sep 24 22:54:42 2013
@@ -33,7 +33,7 @@ import org.apache.pig.data.Tuple;
* It also provides a getIndex() and setIndex() calls that are used to get
* and set the index. These can be used by LocalRearrange, the partitioner,
* and Package to determine the index.
- *
+ *
* Index and the null indicator are packed into one byte to save space.
*/
@@ -48,19 +48,19 @@ public abstract class PigNullableWritabl
* This is a bitmask used in those cases.
*/
public static final byte mqFlag = (byte)0x80;
-
+
/**
* regular indices used in group and cogroup
* can only go from 0x00 to 0x7F
*/
public static final byte idxSpace = (byte)0x7F;
-
+
private boolean mNull;
protected WritableComparable mValue;
private byte mIndex;
-
+
/**
* Compare two nullable objects. Step one is to check if either or both
* are null. If one is null and the other is not, then the one that is
@@ -71,18 +71,19 @@ public abstract class PigNullableWritabl
* These comparators are used by hadoop as part of the post-map sort, when
* the data is still in object format.
*/
+ @Override
public int compareTo(Object o) {
PigNullableWritable w = (PigNullableWritable)o;
if ((mIndex & mqFlag) != 0) { // this is a multi-query index
-
+
if ((mIndex & idxSpace) < (w.mIndex & idxSpace)) return -1;
else if ((mIndex & idxSpace) > (w.mIndex & idxSpace)) return 1;
}
-
+
if (!mNull && !w.mNull) {
int result = mValue.compareTo(w.mValue);
-
+
// If any of the field inside tuple is null, then we do not merge keys
// See PIG-927
if (result == 0 && mValue instanceof Tuple && w.mValue instanceof Tuple)
@@ -102,13 +103,14 @@ public abstract class PigNullableWritabl
else if ((mIndex & idxSpace) > (w.mIndex & idxSpace)) return 1;
else return 0;
}
- else if (mNull) return -1;
+ else if (mNull) return -1;
else return 1;
}
/* (non-Javadoc)
* @see org.apache.hadoop.io.IntWritable#readFields(java.io.DataInput)
*/
+ @Override
public void readFields(DataInput in) throws IOException {
mNull = in.readBoolean();
if (!mNull) mValue.readFields(in);
@@ -118,6 +120,7 @@ public abstract class PigNullableWritabl
/* (non-Javadoc)
* @see org.apache.hadoop.io.IntWritable#write(java.io.DataOutput)
*/
+ @Override
public void write(DataOutput out) throws IOException {
out.writeBoolean(mNull);
if (!mNull) mValue.write(out);
@@ -137,7 +140,7 @@ public abstract class PigNullableWritabl
public void setNull(boolean isNull) {
mNull = isNull;
}
-
+
/**
* @return the index for this value
*/
@@ -169,8 +172,8 @@ public abstract class PigNullableWritabl
else return mValue.hashCode();
}
-
-
+
+
@Override
public boolean equals(Object arg0) {
return compareTo(arg0)==0;
Modified: pig/branches/branch-0.12/test/org/apache/pig/test/TestPackage.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.12/test/org/apache/pig/test/TestPackage.java?rev=1526045&r1=1526044&r2=1526045&view=diff
==============================================================================
--- pig/branches/branch-0.12/test/org/apache/pig/test/TestPackage.java (original)
+++ pig/branches/branch-0.12/test/org/apache/pig/test/TestPackage.java Tue Sep 24 22:54:42 2013
@@ -19,6 +19,10 @@ import static org.junit.Assert.assertEqu
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
import java.io.IOException;
import java.math.BigDecimal;
import java.math.BigInteger;
@@ -34,9 +38,13 @@ import org.apache.pig.backend.hadoop.HDa
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.data.BinSedesTuple;
+import org.apache.pig.data.BinSedesTupleFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.io.NullablePartitionWritable;
import org.apache.pig.impl.io.NullableTuple;
import org.apache.pig.impl.io.PigNullableWritable;
import org.apache.pig.impl.plan.OperatorKey;
@@ -47,23 +55,25 @@ import org.joda.time.DateTime;
import org.junit.Test;
public class TestPackage {
+ private static final TupleFactory binfactory = BinSedesTupleFactory.getInstance();
+
private void runTest(Object key, boolean inner[], byte keyType) throws ExecException,
IOException {
Random r = new Random();
DataBag db1 = GenRandomData.genRandSmallTupDataBag(r, 10, 100);
DataBag db2 = GenRandomData.genRandSmallTupDataBag(r, 10, 100);
List<NullableTuple> db = new ArrayList<NullableTuple>(200);
- Iterator db1Iter = db1.iterator();
+ Iterator<Tuple> db1Iter = db1.iterator();
if (!inner[0]) {
while (db1Iter.hasNext()) {
- NullableTuple it = new NullableTuple((Tuple)db1Iter.next());
+ NullableTuple it = new NullableTuple(db1Iter.next());
it.setIndex((byte)0);
db.add(it);
}
}
- Iterator db2Iter = db2.iterator();
+ Iterator<Tuple> db2Iter = db2.iterator();
while (db2Iter.hasNext()) {
- NullableTuple it = new NullableTuple((Tuple)db2Iter.next());
+ NullableTuple it = new NullableTuple(db2Iter.next());
it.setIndex((byte)1);
db.add(it);
}
@@ -73,6 +83,25 @@ public class TestPackage {
pop.setInner(inner);
PigNullableWritable k = HDataType.getWritableComparableTypes(key, keyType);
pop.attachInput(k, db.iterator());
+ if (keyType != DataType.BAG) {
+ // test serialization
+ NullablePartitionWritable wr;
+ if (keyType == DataType.TUPLE) {
+ BinSedesTuple tup = (BinSedesTuple) binfactory.newTupleNoCopy(((Tuple) k.getValueAsPigType()).getAll());
+ wr = new NullablePartitionWritable(new NullableTuple(tup));
+ } else {
+ wr = new NullablePartitionWritable(k);
+ }
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream out = new DataOutputStream(baos);
+ wr.write(out);
+ byte[] arr = baos.toByteArray();
+ ByteArrayInputStream bais = new ByteArrayInputStream(arr);
+ DataInputStream in = new DataInputStream(bais);
+ NullablePartitionWritable re = new NullablePartitionWritable();
+ re.readFields(in);
+ assertEquals(re, wr);
+ }
// we are not doing any optimization to remove
// parts of the "value" which are present in the "key" in this
@@ -87,7 +116,7 @@ public class TestPackage {
pop.setKeyInfo(keyInfo);
Tuple t = null;
Result res = null;
- res = (Result)pop.getNextTuple();
+ res = pop.getNextTuple();
if (res.returnStatus == POStatus.STATUS_NULL && inner[0])
return;
assertEquals(POStatus.STATUS_OK, res.returnStatus);
@@ -162,6 +191,7 @@ public class TestPackage {
}
}
+
@Test
public void testOperator() throws ExecException, IOException {
byte[] types = DataType.genAllTypes();