You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by jc...@apache.org on 2012/08/09 02:48:16 UTC

svn commit: r1371022 - in /pig/trunk: ./ src/org/apache/pig/data/ test/org/apache/pig/data/

Author: jcoveney
Date: Thu Aug  9 00:48:16 2012
New Revision: 1371022

URL: http://svn.apache.org/viewvc?rev=1371022&view=rev
Log:
[PIG-2862] Hardcode certain tuple lengths into the TUPLE BinInterSedes byte identifier (jcoveney)

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/data/AbstractTuple.java
    pig/trunk/src/org/apache/pig/data/DefaultTuple.java
    pig/trunk/src/org/apache/pig/data/SchemaTuple.java
    pig/trunk/src/org/apache/pig/data/SchemaTupleClassGenerator.java
    pig/trunk/src/org/apache/pig/data/SchemaTupleFactory.java
    pig/trunk/test/org/apache/pig/data/TestSchemaTuple.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1371022&r1=1371021&r2=1371022&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Thu Aug  9 00:48:16 2012
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
 
 IMPROVEMENTS
 
+PIG-2862: Hardcode certain tuple lengths into the TUPLE BinInterSedes byte identifier (jcoveney)
+
 PIG-2855: Provide a method to measure time spent in UDFs (dvryaboy)
 
 PIG-2837: AvroStorage throws StackOverFlowError (cheolsoo via sms)

Modified: pig/trunk/src/org/apache/pig/data/AbstractTuple.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/AbstractTuple.java?rev=1371022&r1=1371021&r2=1371022&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/data/AbstractTuple.java (original)
+++ pig/trunk/src/org/apache/pig/data/AbstractTuple.java Thu Aug  9 00:48:16 2012
@@ -19,8 +19,8 @@ package org.apache.pig.data;
 
 import java.util.Iterator;
 
-import org.apache.pig.impl.util.TupleFormat;
 import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.impl.util.TupleFormat;
 
 import com.google.common.base.Joiner;
 
@@ -67,4 +67,9 @@ public abstract class AbstractTuple impl
     public boolean equals(Object other) {
         return (compareTo(other) == 0);
     }
+
+    @Override
+    public void reference(Tuple t) {
+        throw new RuntimeException("Tuple#reference(Tuple) is deprecated and should not be used");
+    }
 }

Modified: pig/trunk/src/org/apache/pig/data/DefaultTuple.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/DefaultTuple.java?rev=1371022&r1=1371021&r2=1371022&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/data/DefaultTuple.java (original)
+++ pig/trunk/src/org/apache/pig/data/DefaultTuple.java Thu Aug  9 00:48:16 2012
@@ -93,18 +93,6 @@ public class DefaultTuple extends Abstra
     }
 
     /**
-     * Make this tuple reference the contents of another. This method does not copy the underlying data. It maintains
-     * references to the data from the original tuple (and possibly even to the data structure holding the data).
-     * 
-     * @param t
-     *            Tuple to reference.
-     */
-    @Override
-    public void reference(Tuple t) {
-        mFields = t.getAll();
-    }
-
-    /**
      * Find the size of the tuple. Used to be called arity().
      * 
      * @return number of fields in the tuple.

Modified: pig/trunk/src/org/apache/pig/data/SchemaTuple.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/SchemaTuple.java?rev=1371022&r1=1371021&r2=1371022&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/data/SchemaTuple.java (original)
+++ pig/trunk/src/org/apache/pig/data/SchemaTuple.java Thu Aug  9 00:48:16 2012
@@ -153,6 +153,10 @@ public abstract class SchemaTuple<T exte
         bis.writeDatum(out, v, DataType.BAG);
     }
 
+    protected static void write(DataOutput out, Map<String, Object> v) throws IOException {
+        bis.writeDatum(out, v, DataType.MAP);
+    }
+
     protected static void write(DataOutput out, int v) throws IOException {
         SedesHelper.Varint.writeSignedVarInt(v, out);
     }
@@ -185,6 +189,11 @@ public abstract class SchemaTuple<T exte
         return (DataBag) bis.readDatum(in, DataType.BAG);
     }
 
+    @SuppressWarnings("unchecked")
+    protected static Map<String, Object> read(DataInput in, Map<String, Object> v) throws IOException {
+        return (Map<String, Object>) bis.readDatum(in, DataType.MAP);
+    }
+
     protected static int read(DataInput in, int v) throws IOException {
         return SedesHelper.Varint.readSignedVarInt(in);
     }
@@ -336,6 +345,10 @@ public abstract class SchemaTuple<T exte
         return unbox((DataBag) t);
     }
 
+    protected Map<String, Object> unbox(Object v, Map<String, Object> t) {
+        return unbox((Map<String, Object>) t);
+    }
+
     protected byte[] unbox(Object v, byte[] t) {
         return unbox((DataByteArray)v);
     }
@@ -372,6 +385,10 @@ public abstract class SchemaTuple<T exte
         return v;
     }
 
+    protected Map<String, Object> unbox(Map<String, Object> v) {
+        return v;
+    }
+
     protected byte[] unbox(DataByteArray v) {
         if (v == null) {
             return null;
@@ -403,6 +420,10 @@ public abstract class SchemaTuple<T exte
         return v;
     }
 
+    protected Map<String, Object> box(Map<String, Object> v) {
+        return v;
+    }
+
     protected DataByteArray box(byte[] v) {
         if (v == null) {
             return null;
@@ -475,6 +496,10 @@ public abstract class SchemaTuple<T exte
         return isNull ? hash : 31 * hash + v.hashCode();
     }
 
+    protected int hashCodePiece(int hash, Map<String, Object> v, boolean isNull) {
+        return isNull ? hash : 31 * hash + v.hashCode();
+    }
+
     @Override
     public int hashCode() {
         return generatedCodeHashCode();
@@ -581,6 +606,13 @@ public abstract class SchemaTuple<T exte
 
     protected abstract void generatedCodeSetDataBag(int fieldNum, DataBag val) throws ExecException;
 
+    @Override
+    public void setMap(int fieldNum, Map<String, Object> val) throws ExecException {
+        generatedCodeSetMap(fieldNum, val);
+    }
+
+    protected abstract void generatedCodeSetMap(int fieldNum, Map<String, Object> val) throws ExecException;
+
     private void errorIfNull(boolean isNull, String type) throws FieldIsNullException {
         if (isNull) {
             throw new FieldIsNullException("Desired field of type ["+type+"] was null!");
@@ -632,6 +664,11 @@ public abstract class SchemaTuple<T exte
         return val;
     }
 
+    protected Map<String, Object> returnUnlessNull(boolean isNull, Map<String, Object> val) throws FieldIsNullException {
+        errorIfNull(isNull, "Map<String,Object>");
+        return val;
+    }
+
     @Override
     public int getInt(int fieldNum) throws ExecException {
         return generatedCodeGetInt(fieldNum);
@@ -732,6 +769,18 @@ public abstract class SchemaTuple<T exte
         return (DataBag)val;
     }
 
+    @Override
+    public Map<String, Object> getMap(int fieldNum) throws ExecException {
+        return generatedCodeGetMap(fieldNum);
+    }
+
+    protected abstract Map<String, Object> generatedCodeGetMap(int fieldNum) throws ExecException;
+
+    @SuppressWarnings("unchecked")
+    protected Map<String, Object> unboxMap(Object val) {
+        return (Map<String, Object>)val;
+    }
+
     protected static Schema staticSchemaGen(String s) {
         try {
             if (s.equals("")) {
@@ -1032,6 +1081,33 @@ public abstract class SchemaTuple<T exte
         return compare(isNull, val, themNull, themVal);
     }
 
+    protected int compare(boolean usNull, Map<String, Object> usVal, boolean themNull, Map<String, Object> themVal) {
+        if (usNull && themNull) {
+            return 0;
+        } else if (themNull) {
+            return 1;
+        } else if (usNull) {
+            return -1;
+        }
+        return compare(usVal, themVal);
+    }
+
+    protected int compare(Map<String, Object> val, Map<String, Object> themVal) {
+        return DataType.compare(val, themVal, DataType.MAP, DataType.MAP);
+    }
+
+    protected int compareWithElementAtPos(boolean isNull, Map<String, Object> val, SchemaTuple<?> t, int pos) {
+        Map<String, Object> themVal;
+        boolean themNull;
+        try {
+            themVal = t.getMap(pos);
+            themNull = t.isNull(pos);
+        } catch (ExecException e) {
+            throw new RuntimeException("Unable to retrieve DataBag field " + pos + " in given Tuple: " + t, e);
+        }
+        return compare(isNull, val, themNull, themVal);
+    }
+
     protected int compareWithElementAtPos(boolean isNull, SchemaTuple<?> val, SchemaTuple<?> t, int pos) {
         Object themVal;
         boolean themNull;
@@ -1067,16 +1143,6 @@ public abstract class SchemaTuple<T exte
         public abstract A make();
     }
 
-    @NotImplemented
-    public Map<String,Object> getMap(int idx) throws ExecException {
-        throw MethodHelper.methodNotImplemented();
-    }
-
-    @NotImplemented
-    public void setMap(int idx, Map<String,Object> val) throws ExecException {
-        throw MethodHelper.methodNotImplemented();
-    }
-
     public int size() {
         return generatedCodeSize();
     }

Modified: pig/trunk/src/org/apache/pig/data/SchemaTupleClassGenerator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/SchemaTupleClassGenerator.java?rev=1371022&r1=1371021&r2=1371022&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/data/SchemaTupleClassGenerator.java (original)
+++ pig/trunk/src/org/apache/pig/data/SchemaTupleClassGenerator.java Thu Aug  9 00:48:16 2012
@@ -723,17 +723,23 @@ public class SchemaTupleClassGenerator {
             } else if (isLong() || isDouble()) {
                 size += 8;
             } else if (isBytearray()) {
-                s += "(pos_"+fieldPos+" == null ? 8 : SizeUtil.roundToEight(12 + pos_"+fieldPos+".length) * 8) + ";
-            } else if (isString()) {
-                s += "(pos_"+fieldPos+" == null ? 8 : SizeUtil.getPigObjMemSize(pos_"+fieldPos+")) + ";
+                size += 8; //the ptr
+                s += "(pos_"+fieldPos+" == null ? 0 : SizeUtil.roundToEight(12 + pos_"+fieldPos+".length) * 8) + ";
             } else if (isBoolean()) {
                 if (booleans++ % 8 == 0) {
                     size++; //accounts for the byte used to store boolean values
                 }
             } else if (isBag()) {
-                //TODO IMPLEMENT
-            } else {
+                size += 8; //the ptr
+                s += "(pos_"+fieldPos+" == null ? 0 : pos_"+fieldPos+".getMemorySize()) + ";
+            } else if (isMap() || isString()) {
+                size += 8; //the ptr
+                s += "(pos_"+fieldPos+" == null ? 0 : SizeUtil.getPigObjMemSize(pos_"+fieldPos+")) + ";
+            } else if (isTuple()) {
+                size += 8; //the ptr
                 s += "(pos_"+fieldPos+" == null ? 8 : pos_"+fieldPos+".getMemorySize()) + ";
+            } else {
+                throw new RuntimeException("Unsupported type found: " + fs);
             }
 
             if (isPrimitive() && primitives++ % 8 == 0) {
@@ -762,6 +768,8 @@ public class SchemaTupleClassGenerator {
             case (DataType.CHARARRAY): add("    return (String)null;"); break;
             case (DataType.TUPLE): add("    return (Tuple)null;"); break;
             case (DataType.BAG): add("    return (DataBag)null;"); break;
+            case (DataType.MAP): add("    return (Map<String,Object>)null;"); break;
+            default: throw new RuntimeException("Unsupported type");
             }
             add("}");
             addBreak();
@@ -1026,6 +1034,7 @@ public class SchemaTupleClassGenerator {
             listOfFutureMethods.add(new TypeAwareSetString(DataType.BOOLEAN));
             listOfFutureMethods.add(new TypeAwareSetString(DataType.TUPLE));
             listOfFutureMethods.add(new TypeAwareSetString(DataType.BAG));
+            listOfFutureMethods.add(new TypeAwareSetString(DataType.MAP));
             listOfFutureMethods.add(new TypeAwareGetString(DataType.INTEGER));
             listOfFutureMethods.add(new TypeAwareGetString(DataType.LONG));
             listOfFutureMethods.add(new TypeAwareGetString(DataType.FLOAT));
@@ -1035,6 +1044,7 @@ public class SchemaTupleClassGenerator {
             listOfFutureMethods.add(new TypeAwareGetString(DataType.BOOLEAN));
             listOfFutureMethods.add(new TypeAwareGetString(DataType.TUPLE));
             listOfFutureMethods.add(new TypeAwareGetString(DataType.BAG));
+            listOfFutureMethods.add(new TypeAwareGetString(DataType.MAP));
             listOfFutureMethods.add(new ListSetString());
 
             for (TypeInFunctionStringOut t : listOfFutureMethods) {
@@ -1051,6 +1061,7 @@ public class SchemaTupleClassGenerator {
             StringBuilder head =
                 new StringBuilder()
                     .append("import java.util.List;\n")
+                    .append("import java.util.Map;\n")
                     .append("import java.util.Iterator;\n")
                     .append("import java.io.DataOutput;\n")
                     .append("import java.io.DataInput;\n")
@@ -1156,10 +1167,6 @@ public class SchemaTupleClassGenerator {
         public void prepareProcess(Schema.FieldSchema fs) {
             type = fs.type;
 
-            if (type==DataType.MAP) {
-                throw new RuntimeException("Map currently not supported by SchemaTuple");
-            }
-
             process(fieldPos, fs);
             fieldPos++;
         }
@@ -1204,6 +1211,10 @@ public class SchemaTupleClassGenerator {
             return type == DataType.BAG;
         }
 
+        public boolean isMap() {
+            return type == DataType.MAP;
+        }
+
         public boolean isObject() {
             return !isPrimitive();
         }
@@ -1223,6 +1234,7 @@ public class SchemaTupleClassGenerator {
                 case (DataType.BOOLEAN): return "boolean";
                 case (DataType.TUPLE): return "Tuple";
                 case (DataType.BAG): return "DataBag";
+                case (DataType.MAP): return "Map";
                 default: throw new RuntimeException("Can't return String for given type " + DataType.findTypeName(type));
             }
         }

Modified: pig/trunk/src/org/apache/pig/data/SchemaTupleFactory.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/SchemaTupleFactory.java?rev=1371022&r1=1371021&r2=1371022&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/data/SchemaTupleFactory.java (original)
+++ pig/trunk/src/org/apache/pig/data/SchemaTupleFactory.java Thu Aug  9 00:48:16 2012
@@ -57,10 +57,6 @@ public class SchemaTupleFactory implemen
         }
 
         for (Schema.FieldSchema fs : s.getFields()) {
-            if (fs.type == DataType.MAP) {
-                return false;
-            }
-
             if (fs.type == DataType.TUPLE && !isGeneratable(fs.schema)) {
                 return false;
             }

Modified: pig/trunk/test/org/apache/pig/data/TestSchemaTuple.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/data/TestSchemaTuple.java?rev=1371022&r1=1371021&r2=1371022&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/data/TestSchemaTuple.java (original)
+++ pig/trunk/test/org/apache/pig/data/TestSchemaTuple.java Thu Aug  9 00:48:16 2012
@@ -37,6 +37,7 @@ import java.math.BigInteger;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 import java.util.Random;
 
@@ -62,6 +63,8 @@ import org.apache.pig.impl.util.Utils;
 import org.junit.Before;
 import org.junit.Test;
 
+import com.google.common.collect.Maps;
+
 public class TestSchemaTuple {
     private Properties props;
     private Configuration conf;
@@ -171,6 +174,10 @@ public class TestSchemaTuple {
         udfSchema = Utils.getSchemaFromString("int, b:bag{(int,int,int)}");
         SchemaTupleFrontend.registerToGenerateIfPossible(udfSchema, isAppendable, context);
 
+        isAppendable = false;
+        udfSchema = Utils.getSchemaFromString("int, m:map[(int,int,int)]");
+        SchemaTupleFrontend.registerToGenerateIfPossible(udfSchema, isAppendable, context);
+
         // this compiles and "ships"
         SchemaTupleFrontend.copyAllGeneratedToDistributedCache(pigContext, conf);
 
@@ -288,6 +295,11 @@ public class TestSchemaTuple {
         udfSchema = Utils.getSchemaFromString("int, b:bag{(int,int,int)}");
         tf = SchemaTupleFactory.getInstance(udfSchema, isAppendable, context);
         putThroughPaces(tf, udfSchema, isAppendable);
+
+        isAppendable = false;
+        udfSchema = Utils.getSchemaFromString("int, m:map[(int,int,int)]");
+        tf = SchemaTupleFactory.getInstance(udfSchema, isAppendable, context);
+        putThroughPaces(tf, udfSchema, isAppendable);
     }
 
     private void putThroughPaces(SchemaTupleFactory tf, Schema udfSchema, boolean isAppendable) throws Exception {
@@ -399,6 +411,22 @@ public class TestSchemaTuple {
                 db.add(t);
             }
             return db;
+        case DataType.MAP:
+            Map<String, Object> map = Maps.newHashMap();
+            int keys = r.nextInt(10);
+            for (int i = 0; i < keys; i++) {
+                String key = (String) randData(new FieldSchema(null, DataType.CHARARRAY));
+                int values = r.nextInt(100);
+                for (int j = 0; j < values; j++) {
+                    int tSz = r.nextInt(10);
+                    Tuple t = TupleFactory.getInstance().newTuple(tSz);
+                    for (int k = 0; k < tSz; k++) {
+                        t.set(k, r.nextInt());
+                    }
+                    map.put(key,  t);
+                }
+            }
+            return map;
         default: throw new RuntimeException("Cannot generate data for given FieldSchema: " + fs);
         }
     }
@@ -424,6 +452,7 @@ public class TestSchemaTuple {
                 case DataType.DOUBLE: st.getDouble(i); break;
                 case DataType.TUPLE: st.getTuple(i); break;
                 case DataType.BAG: st.getDataBag(i); break;
+                case DataType.MAP: st.getMap(i); break;
                 default: throw new RuntimeException("Unsupported FieldSchema in SchemaTuple: " + fs);
                 }
             } catch (FieldIsNullException e) {