You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by dv...@apache.org on 2013/09/25 00:54:43 UTC

svn commit: r1526045 - in /pig/branches/branch-0.12: ./ src/org/apache/pig/backend/hadoop/ src/org/apache/pig/data/ src/org/apache/pig/impl/io/ test/org/apache/pig/test/

Author: dvryaboy
Date: Tue Sep 24 22:54:42 2013
New Revision: 1526045

URL: http://svn.apache.org/r1526045
Log:
PIG-3479: Fix BigInt, BigDec, Date serialization. Improve perf of PigNullableWritable deserilization

Modified:
    pig/branches/branch-0.12/CHANGES.txt
    pig/branches/branch-0.12/src/org/apache/pig/backend/hadoop/BigDecimalWritable.java
    pig/branches/branch-0.12/src/org/apache/pig/backend/hadoop/BigIntegerWritable.java
    pig/branches/branch-0.12/src/org/apache/pig/backend/hadoop/HDataType.java
    pig/branches/branch-0.12/src/org/apache/pig/data/BinInterSedes.java
    pig/branches/branch-0.12/src/org/apache/pig/data/BinSedesTuple.java
    pig/branches/branch-0.12/src/org/apache/pig/data/DataReaderWriter.java
    pig/branches/branch-0.12/src/org/apache/pig/impl/io/NullablePartitionWritable.java
    pig/branches/branch-0.12/src/org/apache/pig/impl/io/NullableTuple.java
    pig/branches/branch-0.12/src/org/apache/pig/impl/io/PigNullableWritable.java
    pig/branches/branch-0.12/test/org/apache/pig/test/TestPackage.java

Modified: pig/branches/branch-0.12/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.12/CHANGES.txt?rev=1526045&r1=1526044&r2=1526045&view=diff
==============================================================================
--- pig/branches/branch-0.12/CHANGES.txt (original)
+++ pig/branches/branch-0.12/CHANGES.txt Tue Sep 24 22:54:42 2013
@@ -30,6 +30,8 @@ PIG-3174: Remove rpm and deb artifacts f
 
 IMPROVEMENTS
 
+PIG-3479: Fix BigInt, BigDec, Date serialization. Improve perf of PigNullableWritable deserilization (dvryaboy)
+
 PIG-3461: Rewrite PartitionFilterOptimizer to make it work for all the cases (aniket486)
 
 PIG-2417: Streaming UDFs - allow users to easily write UDFs in scripting languages with no

Modified: pig/branches/branch-0.12/src/org/apache/pig/backend/hadoop/BigDecimalWritable.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.12/src/org/apache/pig/backend/hadoop/BigDecimalWritable.java?rev=1526045&r1=1526044&r2=1526045&view=diff
==============================================================================
--- pig/branches/branch-0.12/src/org/apache/pig/backend/hadoop/BigDecimalWritable.java (original)
+++ pig/branches/branch-0.12/src/org/apache/pig/backend/hadoop/BigDecimalWritable.java Tue Sep 24 22:54:42 2013
@@ -58,7 +58,7 @@ public class BigDecimalWritable implemen
 
     @Override
     public void readFields(DataInput in) throws IOException {
-        value = (BigDecimal)bis.readDatum(in, DataType.BIGDECIMAL);
+        value = (BigDecimal)bis.readDatum(in);
     }
 
     @Override
@@ -76,6 +76,7 @@ public class BigDecimalWritable implemen
             super(BigDecimalWritable.class);
         }
 
+        @Override
         public int compare(byte[] b1, int s1, int l1,
                            byte[] b2, int s2, int l2) {
             try {

Modified: pig/branches/branch-0.12/src/org/apache/pig/backend/hadoop/BigIntegerWritable.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.12/src/org/apache/pig/backend/hadoop/BigIntegerWritable.java?rev=1526045&r1=1526044&r2=1526045&view=diff
==============================================================================
--- pig/branches/branch-0.12/src/org/apache/pig/backend/hadoop/BigIntegerWritable.java (original)
+++ pig/branches/branch-0.12/src/org/apache/pig/backend/hadoop/BigIntegerWritable.java Tue Sep 24 22:54:42 2013
@@ -58,7 +58,7 @@ public class BigIntegerWritable implemen
 
     @Override
     public void readFields(DataInput in) throws IOException {
-        value = (BigInteger)bis.readDatum(in, DataType.BIGINTEGER);
+        value = (BigInteger)bis.readDatum(in);
     }
 
     @Override
@@ -76,6 +76,7 @@ public class BigIntegerWritable implemen
             super(BigIntegerWritable.class);
         }
 
+        @Override
         public int compare(byte[] b1, int s1, int l1,
                            byte[] b2, int s2, int l2) {
             try {

Modified: pig/branches/branch-0.12/src/org/apache/pig/backend/hadoop/HDataType.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.12/src/org/apache/pig/backend/hadoop/HDataType.java?rev=1526045&r1=1526044&r2=1526045&view=diff
==============================================================================
--- pig/branches/branch-0.12/src/org/apache/pig/backend/hadoop/HDataType.java (original)
+++ pig/branches/branch-0.12/src/org/apache/pig/backend/hadoop/HDataType.java Tue Sep 24 22:54:42 2013
@@ -19,6 +19,8 @@ package org.apache.pig.backend.hadoop;
 
 import java.math.BigDecimal;
 import java.math.BigInteger;
+import java.util.Arrays;
+import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.pig.PigException;
@@ -60,6 +62,30 @@ public class HDataType {
     static NullableTuple defTup = new NullableTuple();
     static Map<Byte, String> typeToName = null;
 
+    private static final HashMap<String, Byte> classToTypeMap = new HashMap<String, Byte>();
+    static {
+        classToTypeMap.put("org.apache.pig.impl.io.NullableBag", DataType.BAG);
+        classToTypeMap.put("org.apache.pig.impl.io.NullableBigDecimalWritable", DataType.BIGDECIMAL);
+        classToTypeMap.put("org.apache.pig.impl.io.NullableBigIntegerWritable", DataType.BIGINTEGER);
+        classToTypeMap.put("org.apache.pig.impl.io.NullableBooleanWritable", DataType.BOOLEAN);
+        classToTypeMap.put("org.apache.pig.impl.io.NullableBytesWritable", DataType.BYTEARRAY);
+        classToTypeMap.put("org.apache.pig.impl.io.NullableDateTimeWritable", DataType.DATETIME);
+        classToTypeMap.put("org.apache.pig.impl.io.NullableDoubleWritable", DataType.DOUBLE);
+        classToTypeMap.put("org.apache.pig.impl.io.NullableFloatWritable", DataType.FLOAT);
+        classToTypeMap.put("org.apache.pig.impl.io.NullableIntWritable", DataType.INTEGER);
+        classToTypeMap.put("org.apache.pig.impl.io.NullableLongWritable", DataType.LONG);
+        classToTypeMap.put("org.apache.pig.impl.io.NullableText", DataType.CHARARRAY);
+        classToTypeMap.put("org.apache.pig.impl.io.NullableTuple", DataType.TUPLE);
+    }
+
+    public static PigNullableWritable getWritableComparable(String className) throws ExecException {
+        if (classToTypeMap.containsKey(className)) {
+            return getWritableComparableTypes(null, classToTypeMap.get(className));
+        } else {
+            throw new ExecException("Unable to map " + className + " to known types." + Arrays.toString(classToTypeMap.keySet().toArray()));
+        }
+    }
+
     public static PigNullableWritable getWritableComparableTypes(Object o, byte keyType) throws ExecException{
 
         byte newKeyType = keyType;

Modified: pig/branches/branch-0.12/src/org/apache/pig/data/BinInterSedes.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.12/src/org/apache/pig/data/BinInterSedes.java?rev=1526045&r1=1526044&r2=1526045&view=diff
==============================================================================
--- pig/branches/branch-0.12/src/org/apache/pig/data/BinInterSedes.java (original)
+++ pig/branches/branch-0.12/src/org/apache/pig/data/BinInterSedes.java Tue Sep 24 22:54:42 2013
@@ -435,7 +435,7 @@ public class BinInterSedes implements In
     }
 
     private Object readBigDecimal(DataInput in) throws IOException {
-        return new BigDecimal((String)readDatum(in));
+        return  new BigDecimal((String)readDatum(in));
     }
 
     private Object readBigInteger(DataInput in) throws IOException {

Modified: pig/branches/branch-0.12/src/org/apache/pig/data/BinSedesTuple.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.12/src/org/apache/pig/data/BinSedesTuple.java?rev=1526045&r1=1526044&r2=1526045&view=diff
==============================================================================
--- pig/branches/branch-0.12/src/org/apache/pig/data/BinSedesTuple.java (original)
+++ pig/branches/branch-0.12/src/org/apache/pig/data/BinSedesTuple.java Tue Sep 24 22:54:42 2013
@@ -34,7 +34,6 @@ public class BinSedesTuple extends Defau
 
     private static final long serialVersionUID = 1L;
     private static final InterSedes sedes = InterSedesFactory.getInterSedesInstance();
-    
 
     @Override
     public void write(DataOutput out) throws IOException {
@@ -47,8 +46,8 @@ public class BinSedesTuple extends Defau
         // Clear our fields, in case we're being reused.
         mFields.clear();
         sedes.addColsToTuple(in, this);
-    } 
-    
+    }
+
 
 
     /**
@@ -87,7 +86,7 @@ public class BinSedesTuple extends Defau
     BinSedesTuple(List<Object> c, int junk) {
         super(c, junk);
     }
-    
+
     public static Class<? extends TupleRawComparator> getComparatorClass() {
         return InterSedesFactory.getInterSedesInstance().getTupleRawComparatorClass();
     }

Modified: pig/branches/branch-0.12/src/org/apache/pig/data/DataReaderWriter.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.12/src/org/apache/pig/data/DataReaderWriter.java?rev=1526045&r1=1526044&r2=1526045&view=diff
==============================================================================
--- pig/branches/branch-0.12/src/org/apache/pig/data/DataReaderWriter.java (original)
+++ pig/branches/branch-0.12/src/org/apache/pig/data/DataReaderWriter.java Tue Sep 24 22:54:42 2013
@@ -303,6 +303,7 @@ public class DataReaderWriter {
                 out.writeByte(DataType.DATETIME);
                 out.writeLong(((DateTime)val).getMillis());
                 out.writeShort(((DateTime)val).getZone().getOffset((DateTime)val) / 60000);
+                break;
 
             case DataType.BYTEARRAY: {
                 out.writeByte(DataType.BYTEARRAY);

Modified: pig/branches/branch-0.12/src/org/apache/pig/impl/io/NullablePartitionWritable.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.12/src/org/apache/pig/impl/io/NullablePartitionWritable.java?rev=1526045&r1=1526044&r2=1526045&view=diff
==============================================================================
--- pig/branches/branch-0.12/src/org/apache/pig/impl/io/NullablePartitionWritable.java (original)
+++ pig/branches/branch-0.12/src/org/apache/pig/impl/io/NullablePartitionWritable.java Tue Sep 24 22:54:42 2013
@@ -21,6 +21,8 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 
+import org.apache.pig.backend.hadoop.HDataType;
+
 /**
  * NullablePartitionWritable is an adaptor class around PigNullableWritable that adds a partition
  * index to the class.
@@ -30,9 +32,9 @@ public class NullablePartitionWritable e
 	private PigNullableWritable key;
 
 	public NullablePartitionWritable() {
-		
+
 	}
-	
+
 	public NullablePartitionWritable(PigNullableWritable k) {
 		setKey(k);
 	}
@@ -53,50 +55,60 @@ public class NullablePartitionWritable e
 		return partitionIndex;
 	}
 
-  	public int compareTo(Object o) {  		
+  	@Override
+    public int compareTo(Object o) {
 		return key.compareTo(((NullablePartitionWritable)o).getKey());
 	}
-	
-	public void readFields(DataInput in) throws IOException {
+
+	@Override
+    public void readFields(DataInput in) throws IOException {
 		String c = in.readUTF();
-		try{
-			key = (PigNullableWritable)Class.forName(c).newInstance();
-		}catch(Exception e) {
+		try {
+			key = HDataType.getWritableComparable(c);
+		} catch(Exception e) {
 			throw new IOException(e);
 		}
 		key.readFields(in);
 	}
 
-	public void write(DataOutput out) throws IOException {
+	@Override
+    public void write(DataOutput out) throws IOException {
 		out.writeUTF(key.getClass().getName());
 		key.write(out);
 	}
 
-	public boolean isNull() {
+	@Override
+    public boolean isNull() {
 		return key.isNull();
 	}
 
-	public void setNull(boolean isNull) {
+	@Override
+    public void setNull(boolean isNull) {
 		key.setNull(isNull);
 	}
 
-	public byte getIndex() {
+	@Override
+    public byte getIndex() {
 		return key.getIndex();
 	}
 
-	public void setIndex(byte index) {
+	@Override
+    public void setIndex(byte index) {
 		key.setIndex(index);
 	}
 
-	public Object getValueAsPigType() {
+	@Override
+    public Object getValueAsPigType() {
 		return key.getValueAsPigType();
 	}
 
-	public int hashCode() {
+	@Override
+    public int hashCode() {
 		return key.hashCode();
 	}
-	
-	public String toString() {
+
+	@Override
+    public String toString() {
 		return "Partition: " + partitionIndex + " " + key.toString();
 	}
 }

Modified: pig/branches/branch-0.12/src/org/apache/pig/impl/io/NullableTuple.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.12/src/org/apache/pig/impl/io/NullableTuple.java?rev=1526045&r1=1526044&r2=1526045&view=diff
==============================================================================
--- pig/branches/branch-0.12/src/org/apache/pig/impl/io/NullableTuple.java (original)
+++ pig/branches/branch-0.12/src/org/apache/pig/impl/io/NullableTuple.java Tue Sep 24 22:54:42 2013
@@ -42,6 +42,7 @@ public class NullableTuple extends PigNu
         mValue = t;
     }
 
+    @Override
     public Object getValueAsPigType() {
         return isNull() ? null : (Tuple)mValue;
     }

Modified: pig/branches/branch-0.12/src/org/apache/pig/impl/io/PigNullableWritable.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.12/src/org/apache/pig/impl/io/PigNullableWritable.java?rev=1526045&r1=1526044&r2=1526045&view=diff
==============================================================================
--- pig/branches/branch-0.12/src/org/apache/pig/impl/io/PigNullableWritable.java (original)
+++ pig/branches/branch-0.12/src/org/apache/pig/impl/io/PigNullableWritable.java Tue Sep 24 22:54:42 2013
@@ -33,7 +33,7 @@ import org.apache.pig.data.Tuple;
  * It also provides a getIndex() and setIndex() calls that are used to get
  * and set the index.  These can be used by LocalRearrange, the partitioner,
  * and Package to determine the index.
- * 
+ *
  * Index and the null indicator are packed into one byte to save space.
  */
 
@@ -48,19 +48,19 @@ public abstract class PigNullableWritabl
      * This is a bitmask used in those cases.
      */
     public static final byte mqFlag = (byte)0x80;
-    
+
     /**
      *  regular indices used in group and cogroup
      *  can only go from 0x00 to 0x7F
      */
     public static final byte idxSpace = (byte)0x7F;
-    
+
     private boolean mNull;
 
     protected WritableComparable mValue;
 
     private byte mIndex;
-       
+
     /**
      * Compare two nullable objects.  Step one is to check if either or both
      * are null.  If one is null and the other is not, then the one that is
@@ -71,18 +71,19 @@ public abstract class PigNullableWritabl
      * These comparators are used by hadoop as part of the post-map sort, when
      * the data is still in object format.
      */
+    @Override
     public int compareTo(Object o) {
         PigNullableWritable w = (PigNullableWritable)o;
 
         if ((mIndex & mqFlag) != 0) { // this is a multi-query index
-            
+
             if ((mIndex & idxSpace) < (w.mIndex & idxSpace)) return -1;
             else if ((mIndex & idxSpace) > (w.mIndex & idxSpace)) return 1;
         }
-        
+
         if (!mNull && !w.mNull) {
             int result = mValue.compareTo(w.mValue);
-            
+
             // If any of the field inside tuple is null, then we do not merge keys
             // See PIG-927
             if (result == 0 && mValue instanceof Tuple && w.mValue instanceof Tuple)
@@ -102,13 +103,14 @@ public abstract class PigNullableWritabl
             else if ((mIndex & idxSpace) > (w.mIndex & idxSpace)) return 1;
             else return 0;
         }
-        else if (mNull) return -1; 
+        else if (mNull) return -1;
         else return 1;
     }
 
     /* (non-Javadoc)
      * @see org.apache.hadoop.io.IntWritable#readFields(java.io.DataInput)
      */
+    @Override
     public void readFields(DataInput in) throws IOException {
         mNull = in.readBoolean();
         if (!mNull) mValue.readFields(in);
@@ -118,6 +120,7 @@ public abstract class PigNullableWritabl
     /* (non-Javadoc)
      * @see org.apache.hadoop.io.IntWritable#write(java.io.DataOutput)
      */
+    @Override
     public void write(DataOutput out) throws IOException {
         out.writeBoolean(mNull);
         if (!mNull) mValue.write(out);
@@ -137,7 +140,7 @@ public abstract class PigNullableWritabl
     public void setNull(boolean isNull) {
         mNull = isNull;
     }
-    
+
     /**
      * @return the index for this value
      */
@@ -169,8 +172,8 @@ public abstract class PigNullableWritabl
         else return mValue.hashCode();
     }
 
-    
-    
+
+
     @Override
     public boolean equals(Object arg0) {
         return compareTo(arg0)==0;

Modified: pig/branches/branch-0.12/test/org/apache/pig/test/TestPackage.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.12/test/org/apache/pig/test/TestPackage.java?rev=1526045&r1=1526044&r2=1526045&view=diff
==============================================================================
--- pig/branches/branch-0.12/test/org/apache/pig/test/TestPackage.java (original)
+++ pig/branches/branch-0.12/test/org/apache/pig/test/TestPackage.java Tue Sep 24 22:54:42 2013
@@ -19,6 +19,10 @@ import static org.junit.Assert.assertEqu
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
 import java.io.IOException;
 import java.math.BigDecimal;
 import java.math.BigInteger;
@@ -34,9 +38,13 @@ import org.apache.pig.backend.hadoop.HDa
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.data.BinSedesTuple;
+import org.apache.pig.data.BinSedesTupleFactory;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.io.NullablePartitionWritable;
 import org.apache.pig.impl.io.NullableTuple;
 import org.apache.pig.impl.io.PigNullableWritable;
 import org.apache.pig.impl.plan.OperatorKey;
@@ -47,23 +55,25 @@ import org.joda.time.DateTime;
 import org.junit.Test;
 
 public class TestPackage {
+    private static final TupleFactory binfactory = BinSedesTupleFactory.getInstance();
+
     private void runTest(Object key, boolean inner[], byte keyType) throws ExecException,
             IOException {
         Random r = new Random();
         DataBag db1 = GenRandomData.genRandSmallTupDataBag(r, 10, 100);
         DataBag db2 = GenRandomData.genRandSmallTupDataBag(r, 10, 100);
         List<NullableTuple> db = new ArrayList<NullableTuple>(200);
-        Iterator db1Iter = db1.iterator();
+        Iterator<Tuple> db1Iter = db1.iterator();
         if (!inner[0]) {
             while (db1Iter.hasNext()) {
-                NullableTuple it = new NullableTuple((Tuple)db1Iter.next());
+                NullableTuple it = new NullableTuple(db1Iter.next());
                 it.setIndex((byte)0);
                 db.add(it);
             }
         }
-        Iterator db2Iter = db2.iterator();
+        Iterator<Tuple> db2Iter = db2.iterator();
         while (db2Iter.hasNext()) {
-            NullableTuple it = new NullableTuple((Tuple)db2Iter.next());
+            NullableTuple it = new NullableTuple(db2Iter.next());
             it.setIndex((byte)1);
             db.add(it);
         }
@@ -73,6 +83,25 @@ public class TestPackage {
         pop.setInner(inner);
         PigNullableWritable k = HDataType.getWritableComparableTypes(key, keyType);
         pop.attachInput(k, db.iterator());
+        if (keyType != DataType.BAG) {
+            // test serialization
+            NullablePartitionWritable wr;
+            if (keyType == DataType.TUPLE) {
+                BinSedesTuple tup = (BinSedesTuple) binfactory.newTupleNoCopy(((Tuple) k.getValueAsPigType()).getAll());
+                wr = new NullablePartitionWritable(new NullableTuple(tup));
+            } else {
+                wr = new NullablePartitionWritable(k);
+            }
+            ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            DataOutputStream out = new DataOutputStream(baos);
+            wr.write(out);
+            byte[] arr = baos.toByteArray();
+            ByteArrayInputStream bais = new ByteArrayInputStream(arr);
+            DataInputStream in = new DataInputStream(bais);
+            NullablePartitionWritable re = new NullablePartitionWritable();
+            re.readFields(in);
+            assertEquals(re, wr);
+        }
 
         // we are not doing any optimization to remove
         // parts of the "value" which are present in the "key" in this
@@ -87,7 +116,7 @@ public class TestPackage {
         pop.setKeyInfo(keyInfo);
         Tuple t = null;
         Result res = null;
-        res = (Result)pop.getNextTuple();
+        res = pop.getNextTuple();
         if (res.returnStatus == POStatus.STATUS_NULL && inner[0])
             return;
         assertEquals(POStatus.STATUS_OK, res.returnStatus);
@@ -162,6 +191,7 @@ public class TestPackage {
         }
     }
 
+
     @Test
     public void testOperator() throws ExecException, IOException {
         byte[] types = DataType.genAllTypes();