You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by dv...@apache.org on 2010/12/23 02:33:45 UTC

svn commit: r1052127 [3/3] - in /pig/trunk: ./ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/ src/org/apache/pig/backend/hadoop/executionengine/physi...

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java?rev=1052127&r1=1052126&r2=1052127&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java Thu Dec 23 01:33:44 2010
@@ -52,37 +52,37 @@ import org.apache.pig.pen.util.ExampleTu
 public class POLocalRearrange extends PhysicalOperator {
 
     /**
-     * 
+     *
      */
     protected static final long serialVersionUID = 1L;
 
     protected static final TupleFactory mTupleFactory = TupleFactory.getInstance();
 
     private static Log log = LogFactory.getLog(POLocalRearrange.class);
-    
+
     private static final Result ERR_RESULT = new Result();
 
     protected List<PhysicalPlan> plans;
-    
+
     protected List<PhysicalPlan> secondaryPlans;
-    
+
     protected List<ExpressionOperator> leafOps;
-    
+
     protected List<ExpressionOperator> secondaryLeafOps;
 
     // The position of this LR in the package operator
     protected byte index;
-    
+
     protected byte keyType;
-    
+
     protected byte mainKeyType;
-    
+
     protected byte secondaryKeyType;
 
     protected boolean mIsDistinct = false;
-    
+
     protected boolean isCross = false;
-    
+
     // map to store mapping of projected columns to
     // the position in the "Key" where these will be projected to.
     // We use this information to strip off these columns
@@ -93,8 +93,8 @@ public class POLocalRearrange extends Ph
     // For the first input (a), the map would contain following key:value
     // 2:0 (2 corresponds to $2 in cogroup a by ($2, $3) and 0 corresponds to 1st index in key)
     // 3:1 (3 corresponds to $3 in cogroup a by ($2, $3) and 0 corresponds to 2nd index in key)
-    private Map<Integer, Integer> mProjectedColsMap;
-    private Map<Integer, Integer> mSecondaryProjectedColsMap;
+    private final Map<Integer, Integer> mProjectedColsMap;
+    private final Map<Integer, Integer> mSecondaryProjectedColsMap;
 
     // A place holder Tuple used in distinct case where we really don't
     // have any value to pass through.  But hadoop gets cranky if we pass a
@@ -119,12 +119,12 @@ public class POLocalRearrange extends Ph
     private int mProjectedColsMapSize = 0;
     private int mSecondaryProjectedColsMapSize = 0;
 
-       
+
     private boolean useSecondaryKey = false;
-    
+
     // By default, we strip keys from the value.
     private boolean stripKeyFromValue = true;
-    
+
     public POLocalRearrange(OperatorKey k) {
         this(k, -1, null);
     }
@@ -175,9 +175,9 @@ public class POLocalRearrange extends Ph
 
     /**
      * Sets the co-group index of this operator
-     * 
-     * @param index the position of this operator in 
-     * a co-group operation 
+     *
+     * @param index the position of this operator in
+     * a co-group operation
      * @throws ExecException if the index value is bigger then 0x7F
      */
     public void setIndex(int index) throws ExecException {
@@ -186,7 +186,7 @@ public class POLocalRearrange extends Ph
 
     /**
      * Sets the multi-query index of this operator
-     * 
+     *
      * @param index the position of the parent plan of this operator
      * in the enclosed split operator
      * @throws ExecException if the index value is bigger then 0x7F
@@ -194,19 +194,19 @@ public class POLocalRearrange extends Ph
     public void setMultiQueryIndex(int index) throws ExecException {
         setIndex(index, true);
     }
-    
+
     private void setIndex(int index, boolean multiQuery) throws ExecException {
-        if (index > PigNullableWritable.idxSpace) { 
+        if (index > PigNullableWritable.idxSpace) {
             // indices in group and cogroup should only
             // be in the range 0x00 to 0x7F (only 127 possible
             // inputs)
             int errCode = 1082;
-            String msg = multiQuery? 
+            String msg = multiQuery?
                     "Merge more than 127 map-reduce jobs not supported."
                   : "Cogroups with more than 127 inputs not supported.";
             throw new ExecException(msg, errCode, PigException.INPUT);
         } else {
-            // We could potentially be sending the (key, value) relating to 
+            // We could potentially be sending the (key, value) relating to
             // multiple "group by" statements through one map reduce job
             // in  multiquery optimized execution. In this case, we want
             // two keys which have the same content but coming from different
@@ -220,10 +220,10 @@ public class POLocalRearrange extends Ph
             // contents coming from different "group by" operations would have different
             // indices and hence would go to different invocation of reduce()
             this.index = multiQuery ? (byte)(index | PigNullableWritable.mqFlag) : (byte)index;
-        }            
+        }
     }
-    
-    public boolean isDistinct() { 
+
+    public boolean isDistinct() {
         return mIsDistinct;
     }
 
@@ -233,7 +233,7 @@ public class POLocalRearrange extends Ph
             mFakeTuple = mTupleFactory.newTuple();
         }
     }
-    
+
     /**
      * Overridden since the attachment of the new input should cause the old
      * processing to end.
@@ -242,7 +242,7 @@ public class POLocalRearrange extends Ph
     public void attachInput(Tuple t) {
         super.attachInput(t);
     }
-    
+
     /**
      * Calls getNext on the generate operator inside the nested
      * physical plan. Converts the generated tuple into the proper
@@ -250,143 +250,111 @@ public class POLocalRearrange extends Ph
      */
     @Override
     public Result getNext(Tuple t) throws ExecException {
-        
+
         Result inp = null;
         Result res = ERR_RESULT;
         while (true) {
             inp = processInput();
-            if (inp.returnStatus == POStatus.STATUS_EOP || inp.returnStatus == POStatus.STATUS_ERR)
+            if (inp.returnStatus == POStatus.STATUS_EOP || inp.returnStatus == POStatus.STATUS_ERR) {
                 break;
-            if (inp.returnStatus == POStatus.STATUS_NULL)
+            }
+            if (inp.returnStatus == POStatus.STATUS_NULL) {
                 continue;
-            
+            }
+
             for (PhysicalPlan ep : plans) {
                 ep.attachInput((Tuple)inp.result);
             }
-            
+
             List<Result> resLst = new ArrayList<Result>();
-            
+
             if (secondaryPlans!=null) {
                 for (PhysicalPlan ep : secondaryPlans) {
                     ep.attachInput((Tuple)inp.result);
                 }
             }
-            
+
             List<Result> secondaryResLst = null;
-            if (secondaryLeafOps!=null)
+            if (secondaryLeafOps!=null) {
                 secondaryResLst = new ArrayList<Result>();
-            
+            }
+
             for (ExpressionOperator op : leafOps){
-                
+
                 switch(op.getResultType()){
                 case DataType.BAG:
-                    res = op.getNext(dummyBag);
-                    break;
                 case DataType.BOOLEAN:
-                    res = op.getNext(dummyBool);
-                    break;
                 case DataType.BYTEARRAY:
-                    res = op.getNext(dummyDBA);
-                    break;
                 case DataType.CHARARRAY:
-                    res = op.getNext(dummyString);
-                    break;
                 case DataType.DOUBLE:
-                    res = op.getNext(dummyDouble);
-                    break;
                 case DataType.FLOAT:
-                    res = op.getNext(dummyFloat);
-                    break;
                 case DataType.INTEGER:
-                    res = op.getNext(dummyInt);
-                    break;
                 case DataType.LONG:
-                    res = op.getNext(dummyLong);
-                    break;
                 case DataType.MAP:
-                    res = op.getNext(dummyMap);
-                    break;
                 case DataType.TUPLE:
-                    res = op.getNext(dummyTuple);
+                    res = op.getNext(getDummy(op.getResultType()), op.getResultType());
                     break;
                 default:
                     log.error("Invalid result type: " + DataType.findType(op.getResultType()));
                     break;
                 }
-                
+
                 // allow null as group by key
                 if (res.returnStatus != POStatus.STATUS_OK && res.returnStatus != POStatus.STATUS_NULL) {
                     return new Result();
                 }
-              
+
                 resLst.add(res);
             }
-            
+
             if (secondaryLeafOps!=null)
             {
                 for (ExpressionOperator op : secondaryLeafOps){
-                    
+
                     switch(op.getResultType()){
                     case DataType.BAG:
-                        res = op.getNext(dummyBag);
-                        break;
                     case DataType.BOOLEAN:
-                        res = op.getNext(dummyBool);
-                        break;
                     case DataType.BYTEARRAY:
-                        res = op.getNext(dummyDBA);
-                        break;
                     case DataType.CHARARRAY:
-                        res = op.getNext(dummyString);
-                        break;
                     case DataType.DOUBLE:
-                        res = op.getNext(dummyDouble);
-                        break;
                     case DataType.FLOAT:
-                        res = op.getNext(dummyFloat);
-                        break;
                     case DataType.INTEGER:
-                        res = op.getNext(dummyInt);
-                        break;
                     case DataType.LONG:
-                        res = op.getNext(dummyLong);
-                        break;
                     case DataType.MAP:
-                        res = op.getNext(dummyMap);
-                        break;
                     case DataType.TUPLE:
-                        res = op.getNext(dummyTuple);
+                        res = op.getNext(getDummy(op.getResultType()), op.getResultType());
                         break;
                     default:
                         log.error("Invalid result type: " + DataType.findType(op.getResultType()));
                         break;
                     }
-                    
+
                     // allow null as group by key
                     if (res.returnStatus != POStatus.STATUS_OK && res.returnStatus != POStatus.STATUS_NULL) {
                         return new Result();
                     }
-                    
+
                     secondaryResLst.add(res);
                 }
             }
-            
+
             // If we are using secondary sort key, our new key is:
-            // (nullable, index, (key, secondary key), value)             
-            res.result = constructLROutput(resLst,secondaryResLst,(Tuple)inp.result);            
+            // (nullable, index, (key, secondary key), value)
+            res.result = constructLROutput(resLst,secondaryResLst,(Tuple)inp.result);
             res.returnStatus = POStatus.STATUS_OK;
-            
+
             detachPlans(plans);
 
-            if(secondaryPlans != null)
+            if(secondaryPlans != null) {
                 detachPlans(secondaryPlans);
-            
+            }
+
             res.result = illustratorMarkup(inp.result, res.result, 0);
             return res;
         }
         return inp;
     }
-    
+
     private void detachPlans(List<PhysicalPlan> plans) {
         for (PhysicalPlan ep : plans) {
             ep.detachInput();
@@ -398,50 +366,52 @@ public class POLocalRearrange extends Ph
         if(resLst.size()>1){
             Tuple t = mTupleFactory.newTuple(resLst.size());
             int i=-1;
-            for(Result res : resLst)
+            for(Result res : resLst) {
                 t.set(++i, res.result);
-            key = t;           
+            }
+            key = t;
         } else if (resLst.size() == 1 && type == DataType.TUPLE) {
-            
+
             // We get here after merging multiple jobs that have different
             // map key types into a single job during multi-query optimization.
             // If the key isn't a tuple, it must be wrapped in a tuple.
             Object obj = resLst.get(0).result;
             if (obj instanceof Tuple) {
-                key = (Tuple)obj;
+                key = obj;
             } else {
                 Tuple t = mTupleFactory.newTuple(1);
                 t.set(0, resLst.get(0).result);
                 key = t;
-            }        
+            }
         }
         else{
             key = resLst.get(0).result;
         }
         return key;
     }
-    
+
     protected Tuple constructLROutput(List<Result> resLst, List<Result> secondaryResLst, Tuple value) throws ExecException{
         Tuple lrOutput = mTupleFactory.newTuple(3);
         lrOutput.set(0, Byte.valueOf(this.index));
         //Construct key
         Object key;
         Object secondaryKey=null;
-        
+
         if (secondaryResLst!=null && secondaryResLst.size()>0)
         {
             key = getKeyFromResult(resLst, mainKeyType);
             secondaryKey = getKeyFromResult(secondaryResLst, secondaryKeyType);
-        } else
+        } else {
             key = getKeyFromResult(resLst, keyType);
-        
+        }
+
 
         if(!stripKeyFromValue){
             lrOutput.set(1, key);
             lrOutput.set(2, value);
             return lrOutput;
         }
-        
+
         if (mIsDistinct) {
 
             //Put the key and the indexed tuple
@@ -453,9 +423,10 @@ public class POLocalRearrange extends Ph
                 lrOutput.set(2, mFakeTuple);
             return lrOutput;
         } else if(isCross){
-        
-            for(int i=0;i<plans.size();i++)
+
+            for(int i=0;i<plans.size();i++) {
                 value.getAll().remove(0);
+            }
             //Put the index, key, and value
             //in a tuple and return
             lrOutput.set(1, key);
@@ -471,11 +442,11 @@ public class POLocalRearrange extends Ph
                 compoundKey.set(0, key);
                 compoundKey.set(1, secondaryKey);
                 lrOutput.set(1, compoundKey);
-            }
-            else
+            } else {
                 lrOutput.set(1, key);
-            
-            // strip off the columns in the "value" which 
+            }
+
+            // strip off the columns in the "value" which
             // are present in the "key"
             if(mProjectedColsMapSize != 0 || mProjectStar == true) {
 
@@ -498,17 +469,17 @@ public class POLocalRearrange extends Ph
                     // the "value" since all elements are in the
                     // "key"
                     minimalValue = mTupleFactory.newTuple(0);
-    
+
                 }
                 lrOutput.set(2, minimalValue);
-            
+
             } else {
-            
+
                 // there were no columns in the "key"
                 // which we can strip off from the "value"
                 // so just send the value we got
                 lrOutput.set(2, value);
-                
+
             }
             return lrOutput;
         }
@@ -519,16 +490,17 @@ public class POLocalRearrange extends Ph
     }
 
     public void setKeyType(byte keyType) {
-        if (useSecondaryKey)
+        if (useSecondaryKey) {
             this.mainKeyType = keyType;
-        else
+        } else {
             this.keyType = keyType;
+        }
     }
 
     public List<PhysicalPlan> getPlans() {
         return plans;
     }
-    
+
     public void setUseSecondaryKey(boolean useSecondaryKey) {
         this.useSecondaryKey = useSecondaryKey;
         mainKeyType = keyType;
@@ -539,12 +511,12 @@ public class POLocalRearrange extends Ph
         leafOps.clear();
         int keyIndex = 0; // zero based index for fields in the key
         for (PhysicalPlan plan : plans) {
-            ExpressionOperator leaf = (ExpressionOperator)plan.getLeaves().get(0); 
+            ExpressionOperator leaf = (ExpressionOperator)plan.getLeaves().get(0);
             leafOps.add(leaf);
-            
+
             // don't optimize CROSS
             if(!isCross) {
-                // Look for the leaf Ops which are POProject operators - get the 
+                // Look for the leaf Ops which are POProject operators - get the
                 // the columns that these POProject Operators are projecting.
                 // They MUST be projecting either a column or '*'.
                 // Keep track of the columns which are being projected and
@@ -570,16 +542,18 @@ public class POLocalRearrange extends Ph
                     } else {
                         try {
                             List<PhysicalOperator> preds = plan.getPredecessors(leaf);
-                            if (preds==null || !(preds.get(0) instanceof POProject))
+                            if (preds==null || !(preds.get(0) instanceof POProject)) {
                                 mProjectedColsMap.put(project.getColumn(), keyIndex);
+                            }
                         } catch (ExecException e) {
                             int errCode = 2070;
                             String msg = "Problem in accessing column from project operator.";
                             throw new PlanException(msg, errCode, PigException.BUG);
                         }
                     }
-                    if(project.getResultType() == DataType.TUPLE)
+                    if(project.getResultType() == DataType.TUPLE) {
                         isKeyTuple = true;
+                    }
                 }
                 keyIndex++;
             }
@@ -593,18 +567,18 @@ public class POLocalRearrange extends Ph
         }
         mProjectedColsMapSize = mProjectedColsMap.size();
     }
-    
+
     public void setSecondaryPlans(List<PhysicalPlan> plans) throws PlanException {
         this.secondaryPlans = plans;
         secondaryLeafOps.clear();
         int keyIndex = 0; // zero based index for fields in the key
         for (PhysicalPlan plan : plans) {
-            ExpressionOperator leaf = (ExpressionOperator)plan.getLeaves().get(0); 
+            ExpressionOperator leaf = (ExpressionOperator)plan.getLeaves().get(0);
             secondaryLeafOps.add(leaf);
-            
+
             // don't optimize CROSS
             if(!isCross) {
-                // Look for the leaf Ops which are POProject operators - get the 
+                // Look for the leaf Ops which are POProject operators - get the
                 // the columns that these POProject Operators are projecting.
                 // They MUST be projecting either a column or '*'.
                 // Keep track of the columns which are being projected and
@@ -630,16 +604,18 @@ public class POLocalRearrange extends Ph
                     } else {
                         try {
                             List<PhysicalOperator> preds = plan.getPredecessors(leaf);
-                            if (preds==null || !(preds.get(0) instanceof POProject))
+                            if (preds==null || !(preds.get(0) instanceof POProject)) {
                                 mSecondaryProjectedColsMap.put(project.getColumn(), keyIndex);
+                            }
                         } catch (ExecException e) {
                             int errCode = 2070;
                             String msg = "Problem in accessing column from project operator.";
                             throw new PlanException(msg, errCode, PigException.BUG);
                         }
                     }
-                    if(project.getResultType() == DataType.TUPLE)
+                    if(project.getResultType() == DataType.TUPLE) {
                         isSecondaryKeyTuple = true;
+                    }
                 }
                 keyIndex++;
             }
@@ -653,9 +629,9 @@ public class POLocalRearrange extends Ph
         }
         mainKeyType = keyType;
         keyType = DataType.TUPLE;
-        if (plans.size()>1)
+        if (plans.size()>1) {
             secondaryKeyType = DataType.TUPLE;
-        else
+        } else
         {
             secondaryKeyType = plans.get(0).getLeaves().get(0).getResultType();
         }
@@ -663,7 +639,7 @@ public class POLocalRearrange extends Ph
     }
 
     /**
-     * Make a deep copy of this operator.  
+     * Make a deep copy of this operator.
      * @throws CloneNotSupportedException
      */
     @Override
@@ -674,7 +650,7 @@ public class POLocalRearrange extends Ph
             clonePlans.add(plan.clone());
         }
         POLocalRearrange clone = new POLocalRearrange(new OperatorKey(
-            mKey.scope, 
+            mKey.scope,
             NodeIdGenerator.getGenerator().getNextNodeId(mKey.scope)),
             requestedParallelism);
         try {
@@ -710,7 +686,7 @@ public class POLocalRearrange extends Ph
     public Map<Integer, Integer> getProjectedColsMap() {
         return mProjectedColsMap;
     }
-    
+
     /**
      * @return the mProjectedColsMap
      */
@@ -748,7 +724,7 @@ public class POLocalRearrange extends Ph
 
     /**
      * @param plans
-     * @throws ExecException 
+     * @throws ExecException
      */
     public void setPlansFromCombiner(List<PhysicalPlan> plans) throws PlanException {
         this.plans = plans;
@@ -756,12 +732,12 @@ public class POLocalRearrange extends Ph
         mProjectedColsMap.clear();
         int keyIndex = 0; // zero based index for fields in the key
         for (PhysicalPlan plan : plans) {
-            ExpressionOperator leaf = (ExpressionOperator)plan.getLeaves().get(0); 
+            ExpressionOperator leaf = (ExpressionOperator)plan.getLeaves().get(0);
             leafOps.add(leaf);
-            
+
             // don't optimize CROSS
             if(!isCross) {
-                // Look for the leaf Ops which are POProject operators - get the 
+                // Look for the leaf Ops which are POProject operators - get the
                 // the columns that these POProject Operators are projecting.
                 // Keep track of the columns which are being projected and
                 // the position in the "Key" where these will be projected to.
@@ -784,8 +760,9 @@ public class POLocalRearrange extends Ph
                             throw new PlanException(msg, errCode, PigException.BUG);
                         }
                     }
-                    if(project.getResultType() == DataType.TUPLE)
+                    if(project.getResultType() == DataType.TUPLE) {
                         isKeyTuple = true;
+                    }
                 }
                 keyIndex++;
             }
@@ -798,7 +775,7 @@ public class POLocalRearrange extends Ph
             isKeyTuple  = true;
         }
         mProjectedColsMapSize  = mProjectedColsMap.size();
-        
+
     }
 
     protected void setStripKeyFromValue(boolean stripKeyFromValue) {

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartitionRearrange.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartitionRearrange.java?rev=1052127&r1=1052126&r2=1052127&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartitionRearrange.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartitionRearrange.java Thu Dec 23 01:33:44 2010
@@ -41,7 +41,7 @@ import org.apache.pig.impl.util.Utils;
 
 
 /**
- * The partition rearrange operator is a part of the skewed join 
+ * The partition rearrange operator is a part of the skewed join
  * implementation. It has an embedded physical plan that
  * generates tuples of the form (inpKey,reducerIndex,(indxed inp Tuple)).
  *
@@ -49,10 +49,10 @@ import org.apache.pig.impl.util.Utils;
 public class POPartitionRearrange extends POLocalRearrange {
 
     /**
-     * 
+     *
      */
     private static final long serialVersionUID = 1L;
-    
+
     private Integer totalReducers = -1;
     // ReducerMap will store the tuple, max reducer index & min reducer index
     private static Map<Object, Pair<Integer, Integer> > reducerMap = new HashMap<Object, Pair<Integer, Integer> >();
@@ -60,7 +60,7 @@ public class POPartitionRearrange extend
 
     protected static final BagFactory mBagFactory = BagFactory.getInstance();
     private PigContext pigContext;
-    
+
     public POPartitionRearrange(OperatorKey k) {
         this(k, -1, null);
     }
@@ -97,9 +97,9 @@ public class POPartitionRearrange extend
             }
         }
         try {
-          
-            Integer [] redCnt = new Integer[1]; 
-            
+
+            Integer [] redCnt = new Integer[1];
+
             reducerMap = MapRedUtil.loadPartitionFileFromLocalCache(
                     keyDistFile, redCnt, DataType.NULL);
 
@@ -126,10 +126,10 @@ public class POPartitionRearrange extend
      */
     @Override
     public Result getNext(Tuple t) throws ExecException {
-        
+
         Result inp = null;
         Result res = null;
-        
+
         // Load the skewed join key partitioning file
         if (!loaded) {
         	try {
@@ -138,77 +138,48 @@ public class POPartitionRearrange extend
         		throw new RuntimeException(e);
         	}
         }
-		
+
         while (true) {
             inp = processInput();
-            if (inp.returnStatus == POStatus.STATUS_EOP || inp.returnStatus == POStatus.STATUS_ERR)
+            if (inp.returnStatus == POStatus.STATUS_EOP || inp.returnStatus == POStatus.STATUS_ERR) {
                 break;
-            if (inp.returnStatus == POStatus.STATUS_NULL)
+            }
+            if (inp.returnStatus == POStatus.STATUS_NULL) {
                 continue;
-            
+            }
+
             for (PhysicalPlan ep : plans) {
                 ep.attachInput((Tuple)inp.result);
             }
             List<Result> resLst = new ArrayList<Result>();
             for (ExpressionOperator op : leafOps){
-                
-                switch(op.getResultType()){
-                case DataType.BAG:
-                    res = op.getNext(dummyBag);
-                    break;
-                case DataType.BOOLEAN:
-                    res = op.getNext(dummyBool);
-                    break;
-                case DataType.BYTEARRAY:
-                    res = op.getNext(dummyDBA);
-                    break;
-                case DataType.CHARARRAY:
-                    res = op.getNext(dummyString);
-                    break;
-                case DataType.DOUBLE:
-                    res = op.getNext(dummyDouble);
-                    break;
-                case DataType.FLOAT:
-                    res = op.getNext(dummyFloat);
-                    break;
-                case DataType.INTEGER:
-                    res = op.getNext(dummyInt);
-                    break;
-                case DataType.LONG:
-                    res = op.getNext(dummyLong);
-                    break;
-                case DataType.MAP:
-                    res = op.getNext(dummyMap);
-                    break;
-                case DataType.TUPLE:
-                    res = op.getNext(dummyTuple);
-                    break;
-                }
-                if(res.returnStatus!=POStatus.STATUS_OK)
+                res = op.getNext(getDummy(op.getResultType()), op.getResultType());
+                if(res.returnStatus!=POStatus.STATUS_OK) {
                     return new Result();
+                }
                 resLst.add(res);
             }
             res.result = constructPROutput(resLst,(Tuple)inp.result);
-            
+
             return res;
         }
         return inp;
     }
 
-	// Returns bag of tuples 
+	// Returns bag of tuples
     protected DataBag constructPROutput(List<Result> resLst, Tuple value) throws ExecException{
 		Tuple t = super.constructLROutput(resLst, null, value);
 
         //Construct key
         Object key = t.get(1);
-        
+
 		// Construct an output bag and feed in the tuples
 		DataBag opBag = mBagFactory.newDefaultBag();
 
 		//Put the index, key, and value
 		//in a tuple and return
 		Pair <Integer, Integer> indexes = reducerMap.get(key);	// first -> min, second ->max
-	
+
 		// For non skewed keys, we set the partition index to be -1
 		if (indexes == null) {
 			indexes = new Pair <Integer, Integer>(-1,0);
@@ -224,10 +195,10 @@ public class POPartitionRearrange extend
 			opTuple.set(1, reducerIdx.intValue());
 			opTuple.set(2, key);
 			opTuple.set(3, t.get(2));
-			
+
 			opBag.add(opTuple);
 		}
-		
+
 		return opBag;
     }
 
@@ -246,7 +217,7 @@ public class POPartitionRearrange extend
     }
 
     /**
-     * Make a deep copy of this operator.  
+     * Make a deep copy of this operator.
      * @throws CloneNotSupportedException
      */
     @Override

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPreCombinerLocalRearrange.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPreCombinerLocalRearrange.java?rev=1052127&r1=1052126&r2=1052127&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPreCombinerLocalRearrange.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPreCombinerLocalRearrange.java Thu Dec 23 01:33:44 2010
@@ -40,13 +40,13 @@ import org.apache.pig.impl.plan.VisitorE
 
 /**
  * A specialized local rearrange operator which behaves
- * like the regular local rearrange in the getNext() 
- * as far as getting its input and constructing the 
+ * like the regular local rearrange in the getNext()
+ * as far as getting its input and constructing the
  * "key" out of the input. It then returns a tuple with
  * two fields - the key in the first position and the
  * "value" inside a bag in the second position. This output
  * format resembles the format out of a Package. This output
- * will feed to a foreach which expects this format.  
+ * will feed to a foreach which expects this format.
  */
 public class POPreCombinerLocalRearrange extends PhysicalOperator {
 
@@ -56,11 +56,11 @@ public class POPreCombinerLocalRearrange
     protected static BagFactory mBagFactory = BagFactory.getInstance();
 
     private static Log log = LogFactory.getLog(POPreCombinerLocalRearrange.class);
-    
+
     private static final Result ERR_RESULT = new Result();
-    
+
     protected List<PhysicalPlan> plans;
-    
+
     protected List<ExpressionOperator> leafOps;
 
     protected byte keyType;
@@ -112,7 +112,7 @@ public class POPreCombinerLocalRearrange
     public void attachInput(Tuple t) {
         super.attachInput(t);
     }
-    
+
     /**
      * Calls getNext on the generate operator inside the nested
      * physical plan. Converts the generated tuple into the proper
@@ -120,89 +120,74 @@ public class POPreCombinerLocalRearrange
      */
     @Override
     public Result getNext(Tuple t) throws ExecException {
-        
+
         Result inp = null;
         Result res = ERR_RESULT;
         while (true) {
             inp = processInput();
-            if (inp.returnStatus == POStatus.STATUS_EOP || inp.returnStatus == POStatus.STATUS_ERR)
+            if (inp.returnStatus == POStatus.STATUS_EOP || inp.returnStatus == POStatus.STATUS_ERR) {
                 break;
-            if (inp.returnStatus == POStatus.STATUS_NULL)
+            }
+            if (inp.returnStatus == POStatus.STATUS_NULL) {
                 continue;
-            
+            }
+
             for (PhysicalPlan ep : plans) {
                 ep.attachInput((Tuple)inp.result);
             }
             List<Result> resLst = new ArrayList<Result>();
             for (ExpressionOperator op : leafOps){
-                
+
                 switch(op.getResultType()){
                 case DataType.BAG:
-                    res = op.getNext(dummyBag);
-                    break;
                 case DataType.BOOLEAN:
-                    res = op.getNext(dummyBool);
-                    break;
                 case DataType.BYTEARRAY:
-                    res = op.getNext(dummyDBA);
-                    break;
                 case DataType.CHARARRAY:
-                    res = op.getNext(dummyString);
-                    break;
                 case DataType.DOUBLE:
-                    res = op.getNext(dummyDouble);
-                    break;
                 case DataType.FLOAT:
-                    res = op.getNext(dummyFloat);
-                    break;
                 case DataType.INTEGER:
-                    res = op.getNext(dummyInt);
-                    break;
                 case DataType.LONG:
-                    res = op.getNext(dummyLong);
-                    break;
                 case DataType.MAP:
-                    res = op.getNext(dummyMap);
-                    break;
                 case DataType.TUPLE:
-                    res = op.getNext(dummyTuple);
+                    res = op.getNext(getDummy(op.getResultType()), op.getResultType());
                     break;
                 default:
                     log.error("Invalid result type: "
                             + DataType.findType(op.getResultType()));
                     break;
                 }
-                
+
                 // allow null as group by key
                 if (res.returnStatus != POStatus.STATUS_OK
                         && res.returnStatus != POStatus.STATUS_NULL) {
                     return new Result();
                 }
-                
+
                 resLst.add(res);
             }
             res.result = constructLROutput(resLst,(Tuple)inp.result);
             res.returnStatus = POStatus.STATUS_OK;
-            
+
             return res;
         }
         return inp;
     }
-    
+
     protected Tuple constructLROutput(List<Result> resLst, Tuple value) throws ExecException{
         //Construct key
         Object key;
         if(resLst.size()>1){
             Tuple t = mTupleFactory.newTuple(resLst.size());
             int i=-1;
-            for(Result res : resLst)
+            for(Result res : resLst) {
                 t.set(++i, res.result);
+            }
             key = t;
         }
         else{
             key = resLst.get(0).result;
         }
-        
+
         Tuple output = mTupleFactory.newTuple(2);
         output.set(0, key);
         // put the value in a bag so that the initial
@@ -229,9 +214,9 @@ public class POPreCombinerLocalRearrange
         this.plans = plans;
         leafOps.clear();
         for (PhysicalPlan plan : plans) {
-            ExpressionOperator leaf = (ExpressionOperator)plan.getLeaves().get(0); 
+            ExpressionOperator leaf = (ExpressionOperator)plan.getLeaves().get(0);
             leafOps.add(leaf);
-        }            
+        }
     }
 
     @Override

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java?rev=1052127&r1=1052126&r2=1052127&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java Thu Dec 23 01:33:44 2010
@@ -37,14 +37,11 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ExpressionOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserComparisonFunc;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
 import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataType;
-import org.apache.pig.data.InternalCachedBag;
 import org.apache.pig.data.InternalSortedBag;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.impl.plan.VisitorException;
@@ -55,13 +52,13 @@ import org.apache.pig.pen.util.LineageTr
  * This implementation is applicable for both the physical plan and for the
  * local backend, as the conversion of physical to mapreduce would see the SORT
  * operator and take necessary steps to convert it to a quantile and a sort job.
- * 
+ *
  * This is a blocking operator. The sortedDataBag accumulates Tuples and sorts
  * them only when there an iterator is started. So all the tuples from the input
  * operator should be accumulated and filled into the dataBag. The attachInput
  * method is not applicable here.
- * 
- * 
+ *
+ *
  */
 
 //We intentionally skip type checking in backend for performance reasons
@@ -69,7 +66,7 @@ import org.apache.pig.pen.util.LineageTr
 public class POSort extends PhysicalOperator {
 
 	/**
-     * 
+     *
      */
     private static final long serialVersionUID = 1L;
     //private List<Integer> mSortCols;
@@ -85,9 +82,9 @@ public class POSort extends PhysicalOper
 	public boolean isUDFComparatorUsed = false;
 	private DataBag sortedBag;
 	transient Iterator<Tuple> it;
-	
+
 	private SortInfo sortInfo;
-	
+
 	public POSort(
             OperatorKey k,
             int rp,
@@ -137,18 +134,20 @@ public class POSort extends PhysicalOper
 		super(k);
 
 	}
-	
+
 	public class SortComparator implements Comparator<Tuple>,Serializable {
 		/**
-         * 
+         *
          */
         private static final long serialVersionUID = 1L;
 
+        @Override
         public int compare(Tuple o1, Tuple o2) {
 			int count = 0;
 			int ret = 0;
-			if(sortPlans == null || sortPlans.size() == 0) 
-				return 0;
+			if(sortPlans == null || sortPlans.size() == 0) {
+                return 0;
+            }
 			for(PhysicalPlan plan : sortPlans) {
 				try {
 					plan.attachInput(o1);
@@ -175,48 +174,36 @@ public class POSort extends PhysicalOper
                         }
 
 					}
-						
+
 				} catch (ExecException e) {
 					log.error("Invalid result while executing the expression plan : " + plan.toString() + "\n" + e.getMessage());
 				}
 			}
 			return ret;
-		} 
-		
+		}
+
 		private Result getResult(PhysicalPlan plan, byte resultType) throws ExecException {
 			ExpressionOperator Op = (ExpressionOperator) plan.getLeaves().get(0);
 			Result res = null;
-			
+
 			switch (resultType) {
             case DataType.BYTEARRAY:
-                res = Op.getNext(dummyDBA);
-                break;
             case DataType.CHARARRAY:
-                res = Op.getNext(dummyString);
-                break;
             case DataType.DOUBLE:
-                res = Op.getNext(dummyDouble);
-                break;
             case DataType.FLOAT:
-                res = Op.getNext(dummyFloat);
-                break;
             case DataType.INTEGER:
-                res = Op.getNext(dummyInt);
-                break;
             case DataType.LONG:
-                res = Op.getNext(dummyLong);
-                break;
             case DataType.TUPLE:
-                res = Op.getNext(dummyTuple);
+                res = Op.getNext(getDummy(resultType), resultType);
                 break;
 
             default: {
                 int errCode = 2082;
                 String msg = "Did not expect result of type: " +
                         DataType.findTypeName(resultType);
-                    throw new ExecException(msg, errCode, PigException.BUG);                
+                    throw new ExecException(msg, errCode, PigException.BUG);
             }
-            
+
             }
 			return res;
 		}
@@ -225,10 +212,11 @@ public class POSort extends PhysicalOper
 	public class UDFSortComparator implements Comparator<Tuple>,Serializable {
 
 		/**
-         * 
+         *
          */
         private static final long serialVersionUID = 1L;
 
+        @Override
         public int compare(Tuple t1, Tuple t2) {
 
 			mSortFunc.attachInput(t1, t2);
@@ -241,10 +229,11 @@ public class POSort extends PhysicalOper
 				log.error("Input not ready. Error on reading from input. "
 						+ e.getMessage());
 			}
-			if (res != null)
-				return (Integer) res.result;
-			else
-				return 0;
+			if (res != null) {
+                return (Integer) res.result;
+            } else {
+                return 0;
+            }
 		}
 
 	}
@@ -266,21 +255,21 @@ public class POSort extends PhysicalOper
 	@Override
 	public Result getNext(Tuple t) throws ExecException {
 		Result res = new Result();
-		
+
 		if (!inputsAccumulated) {
-			res = processInput();         
+			res = processInput();
 			// by default, we create InternalSortedBag, unless user configures
 			// explicitly to use old bag
 			String bagType = null;
 	        if (PigMapReduce.sJobConf != null) {
-	   			bagType = PigMapReduce.sJobConf.get("pig.cachedbag.sort.type");       			
-	   	    }	        
-            if (bagType != null && bagType.equalsIgnoreCase("default")) {        	    	
-            	sortedBag = BagFactory.getInstance().newSortedBag(mComparator);     			
+	   			bagType = PigMapReduce.sJobConf.get("pig.cachedbag.sort.type");
+	   	    }
+            if (bagType != null && bagType.equalsIgnoreCase("default")) {
+            	sortedBag = BagFactory.getInstance().newSortedBag(mComparator);
        	    } else {
     	    	sortedBag = new InternalSortedBag(3, mComparator);
     	    }
-            
+
 			while (res.returnStatus != POStatus.STATUS_EOP) {
 				if (res.returnStatus == POStatus.STATUS_ERR) {
 					log.error("Error in reading from the inputs");
@@ -357,17 +346,17 @@ public class POSort extends PhysicalOper
     public List<Boolean> getMAscCols() {
         return mAscCols;
     }
-    
+
     public void setLimit(long l)
     {
     	limit = l;
     }
-    
+
     public long getLimit()
     {
     	return limit;
     }
-    
+
     public boolean isLimited()
     {
     	return (limit!=-1);
@@ -389,7 +378,7 @@ public class POSort extends PhysicalOper
             cloneFunc = mSortFunc.clone();
         }
         // Don't set inputs as PhysicalPlan.clone will take care of that
-        return new POSort(new OperatorKey(mKey.scope, 
+        return new POSort(new OperatorKey(mKey.scope,
             NodeIdGenerator.getGenerator().getNextNodeId(mKey.scope)),
             requestedParallelism, null, clonePlans, cloneAsc, cloneFunc);
     }

Modified: pig/trunk/src/org/apache/pig/data/DataType.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/DataType.java?rev=1052127&r1=1052126&r2=1052127&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/data/DataType.java (original)
+++ pig/trunk/src/org/apache/pig/data/DataType.java Thu Dec 23 01:33:44 2010
@@ -54,7 +54,7 @@ public class DataType {
     // IMPORTANT! Order matters here, as compare() below uses the order to
     // order unlike datatypes.  Don't change this ordering.
     // Spaced unevenly to leave room for new entries without changing
-    // values or creating order issues.  
+    // values or creating order issues.
     public static final byte UNKNOWN   =   0;
     public static final byte NULL      =   1;
     public static final byte BOOLEAN   =   5; // internal use only
@@ -72,14 +72,14 @@ public class DataType {
     public static final byte MAP       = 100;
     public static final byte TUPLE     = 110;
     public static final byte BAG       = 120;
-    
+
     /**
-     * Internal use only; used to store WriteableComparable objects 
+     * Internal use only; used to store WriteableComparable objects
      * for creating ordered index in MergeJoin. Expecting a object that
      * implements Writable interface and has default constructor
      */
-    public static final byte GENERIC_WRITABLECOMPARABLE = 123; 
-    
+    public static final byte GENERIC_WRITABLECOMPARABLE = 123;
+
     /**
      * Internal use only.
      */
@@ -92,23 +92,38 @@ public class DataType {
      * @return byte code of the type, or ERROR if we don't know.
      */
     public static byte findType(Object o) {
-        if (o == null) return NULL;
+        if (o == null) {
+            return NULL;
+        }
 
         // Try to put the most common first
-        if (o instanceof DataByteArray) return BYTEARRAY;
-        else if (o instanceof String) return CHARARRAY;
-        else if (o instanceof Tuple) return TUPLE;
-        else if (o instanceof DataBag) return BAG;
-        else if (o instanceof Integer) return INTEGER;
-        else if (o instanceof Long) return LONG;
-        else if (o instanceof InternalMap) return INTERNALMAP;
-        else if (o instanceof Map) return MAP;
-        else if (o instanceof Float) return FLOAT;
-        else if (o instanceof Double) return DOUBLE;
-        else if (o instanceof Boolean) return BOOLEAN;
-        else if (o instanceof Byte) return BYTE;
-        else if (o instanceof WritableComparable) return GENERIC_WRITABLECOMPARABLE;
-        else {return ERROR;}
+        if (o instanceof DataByteArray) {
+            return BYTEARRAY;
+        } else if (o instanceof String) {
+            return CHARARRAY;
+        } else if (o instanceof Tuple) {
+            return TUPLE;
+        } else if (o instanceof DataBag) {
+            return BAG;
+        } else if (o instanceof Integer) {
+            return INTEGER;
+        } else if (o instanceof Long) {
+            return LONG;
+        } else if (o instanceof InternalMap) {
+            return INTERNALMAP;
+        } else if (o instanceof Map) {
+            return MAP;
+        } else if (o instanceof Float) {
+            return FLOAT;
+        } else if (o instanceof Double) {
+            return DOUBLE;
+        } else if (o instanceof Boolean) {
+            return BOOLEAN;
+        } else if (o instanceof Byte) {
+            return BYTE;
+        } else if (o instanceof WritableComparable) {
+            return GENERIC_WRITABLECOMPARABLE;
+        } else {return ERROR;}
     }
 
     /**
@@ -118,19 +133,30 @@ public class DataType {
      * @return byte code of the type, or ERROR if we don't know.
      */
     public static byte findType(Type t) {
-        if (t == null) return NULL;
+        if (t == null) {
+            return NULL;
+        }
 
         // Try to put the most common first
-        if (t == DataByteArray.class) return BYTEARRAY;
-        else if (t == String.class) return CHARARRAY;
-        else if (t == Integer.class) return INTEGER;
-        else if (t == Long.class) return LONG;
-        else if (t == Float.class) return FLOAT;
-        else if (t == Double.class) return DOUBLE;
-        else if (t == Boolean.class) return BOOLEAN;
-        else if (t == Byte.class) return BYTE;
-        else if (t == InternalMap.class) return INTERNALMAP;
-        else {
+        if (t == DataByteArray.class) {
+            return BYTEARRAY;
+        } else if (t == String.class) {
+            return CHARARRAY;
+        } else if (t == Integer.class) {
+            return INTEGER;
+        } else if (t == Long.class) {
+            return LONG;
+        } else if (t == Float.class) {
+            return FLOAT;
+        } else if (t == Double.class) {
+            return DOUBLE;
+        } else if (t == Boolean.class) {
+            return BOOLEAN;
+        } else if (t == Byte.class) {
+            return BYTE;
+        } else if (t == InternalMap.class) {
+            return INTERNALMAP;
+        } else {
             // Might be a tuple or a bag, need to check the interfaces it
             // implements
             if (t instanceof Class) {
@@ -170,12 +196,13 @@ public class DataType {
 		        matchedWritableComparable = true;
                     }
 		}
-		if(matchedWritableComparable)
-		    return GENERIC_WRITABLECOMPARABLE;
-		
+		if(matchedWritableComparable) {
+            return GENERIC_WRITABLECOMPARABLE;
+        }
+
 		return ERROR;
 	}
-    
+
     /**
      * Return the number of types Pig knows about.
      * @return number of types
@@ -190,23 +217,23 @@ public class DataType {
      * @return byte array with an entry for each type.
      */
     public static byte[] genAllTypes(){
-        byte[] types = { DataType.BAG, DataType.BIGCHARARRAY, DataType.BOOLEAN, DataType.BYTE, DataType.BYTEARRAY, 
-                DataType.CHARARRAY, DataType.DOUBLE, DataType.FLOAT, 
+        byte[] types = { DataType.BAG, DataType.BIGCHARARRAY, DataType.BOOLEAN, DataType.BYTE, DataType.BYTEARRAY,
+                DataType.CHARARRAY, DataType.DOUBLE, DataType.FLOAT,
                 DataType.GENERIC_WRITABLECOMPARABLE,
-                DataType.INTEGER, DataType.INTERNALMAP, 
+                DataType.INTEGER, DataType.INTERNALMAP,
                 DataType.LONG, DataType.MAP, DataType.TUPLE};
         return types;
     }
-    
+
     private static String[] genAllTypeNames(){
-        String[] names = { "BAG", "BIGCHARARRAY", "BOOLEAN", "BYTE", "BYTEARRAY", 
-                "CHARARRAY", "DOUBLE", "FLOAT", 
+        String[] names = { "BAG", "BIGCHARARRAY", "BOOLEAN", "BYTE", "BYTEARRAY",
+                "CHARARRAY", "DOUBLE", "FLOAT",
                 "GENERIC_WRITABLECOMPARABLE",
                 "INTEGER","INTERNALMAP",
                 "LONG", "MAP", "TUPLE" };
         return names;
     }
-    
+
     /**
      * Get a map of type values to type names.
      * @return map
@@ -243,7 +270,7 @@ public class DataType {
     public static String findTypeName(Object o) {
         return findTypeName(findType(o));
     }
-    
+
     /**
      * Get the type name from the type byte code
      * @param dt Type byte code
@@ -333,7 +360,7 @@ public class DataType {
      * @return true if the type can have a valid schema (i.e., bag or tuple)
      */
     public static boolean isSchemaType(byte dataType) {
-        return ((dataType == BAG) || (dataType == TUPLE)); 
+        return ((dataType == BAG) || (dataType == TUPLE));
     }
 
     /**
@@ -357,10 +384,10 @@ public class DataType {
     }
 
     /**
-     * Same as {@link #compare(Object, Object)}, but does not use reflection to determine the type 
+     * Same as {@link #compare(Object, Object)}, but does not use reflection to determine the type
      * of passed in objects, relying instead on the caller to provide the appropriate values, as
      * determined by {@link DataType#findType(Object)}.
-     * 
+     *
      * Use this version in cases where multiple objects of the same type have to be repeatedly compared.
      * @param o1 first object
      * @param o2 second object
@@ -394,7 +421,7 @@ public class DataType {
                 return ((Double)o1).compareTo((Double)o2);
 
             case BYTEARRAY:
-                return ((DataByteArray)o1).compareTo((DataByteArray)o2);
+                return ((DataByteArray)o1).compareTo(o2);
 
             case CHARARRAY:
                 return ((String)o1).compareTo((String)o2);
@@ -406,7 +433,7 @@ public class DataType {
                 int sz2 = m2.size();
                 if (sz1 < sz2) {
                     return -1;
-                } else if (sz1 > sz2) { 
+                } else if (sz1 > sz2) {
                     return 1;
                 } else {
                     // This is bad, but we have to sort the keys of the maps in order
@@ -428,7 +455,7 @@ public class DataType {
                             if (c != 0) {
                                 return c;
                             }
-                        } 
+                        }
                     }
                     return 0;
                 }
@@ -439,13 +466,13 @@ public class DataType {
 
             case INTERNALMAP:
                 return -1;  // Don't think anyway will want to do this.
-                
+
             case TUPLE:
-                return ((Tuple)o1).compareTo((Tuple)o2);
+                return ((Tuple)o1).compareTo(o2);
 
             case BAG:
-                return ((DataBag)o1).compareTo((DataBag)o2);
-                
+                return ((DataBag)o1).compareTo(o2);
+
 
             default:
                 throw new RuntimeException("Unkown type " + dt1 +
@@ -457,7 +484,7 @@ public class DataType {
             return 1;
         }
     }
-    
+
     public static byte[] toBytes(Object o) throws ExecException {
         return toBytes(o, findType(o));
     }
@@ -513,8 +540,11 @@ public class DataType {
         try {
 			switch (type) {
 			case BOOLEAN:
-			    if (((Boolean)o) == true) return Integer.valueOf(1);
-			    else return Integer.valueOf(0);
+			    if (((Boolean)o) == true) {
+                    return Integer.valueOf(1);
+                } else {
+                    return Integer.valueOf(0);
+                }
 
 			case BYTE:
 			    return Integer.valueOf(((Byte)o).intValue());
@@ -573,7 +603,7 @@ public class DataType {
      * forced to an Integer.  This isn't particularly efficient, so if you
      * already <b>know</b> that the object you have is an Integer you
      * should just cast it.  Unlike {@link #toInteger(Object, byte)} this
-     * method will first determine the type of o and then do the cast.  
+     * method will first determine the type of o and then do the cast.
      * Use {@link #toInteger(Object, byte)} if you already know the type.
      * @param o object to cast
      * @return The object as an Integer.
@@ -599,8 +629,11 @@ public class DataType {
         try {
 			switch (type) {
 			case BOOLEAN:
-			    if (((Boolean)o) == true) return Long.valueOf(1);
-			    else return Long.valueOf(0);
+			    if (((Boolean)o) == true) {
+                    return Long.valueOf(1);
+                } else {
+                    return Long.valueOf(0);
+                }
 
 			case BYTE:
 			    return Long.valueOf(((Byte)o).longValue());
@@ -660,7 +693,7 @@ public class DataType {
      * forced to an Long.  This isn't particularly efficient, so if you
      * already <b>know</b> that the object you have is a Long you
      * should just cast it.  Unlike {@link #toLong(Object, byte)} this
-     * method will first determine the type of o and then do the cast.  
+     * method will first determine the type of o and then do the cast.
      * Use {@link #toLong(Object, byte)} if you already know the type.
      * @param o object to cast
      * @return The object as a Long.
@@ -741,7 +774,7 @@ public class DataType {
      * forced to an Float.  This isn't particularly efficient, so if you
      * already <b>know</b> that the object you have is a Float you
      * should just cast it.  Unlike {@link #toFloat(Object, byte)} this
-     * method will first determine the type of o and then do the cast.  
+     * method will first determine the type of o and then do the cast.
      * Use {@link #toFloat(Object, byte)} if you already know the type.
      * @param o object to cast
      * @return The object as a Float.
@@ -822,7 +855,7 @@ public class DataType {
      * forced to an Double.  This isn't particularly efficient, so if you
      * already <b>know</b> that the object you have is a Double you
      * should just cast it.  Unlike {@link #toDouble(Object, byte)} this
-     * method will first determine the type of o and then do the cast.  
+     * method will first determine the type of o and then do the cast.
      * Use {@link #toDouble(Object, byte)} if you already know the type.
      * @param o object to cast
      * @return The object as a Double.
@@ -901,7 +934,7 @@ public class DataType {
      * forced to a String.  This isn't particularly efficient, so if you
      * already <b>know</b> that the object you have is a String you
      * should just cast it.  Unlike {@link #toString(Object, byte)} this
-     * method will first determine the type of o and then do the cast.  
+     * method will first determine the type of o and then do the cast.
      * Use {@link #toString(Object, byte)} if you already know the type.
      * @param o object to cast
      * @return The object as a String.
@@ -922,7 +955,9 @@ public class DataType {
      */
     @SuppressWarnings("unchecked")
     public static Map<String, Object> toMap(Object o) throws ExecException {
-        if (o == null) return null;
+        if (o == null) {
+            return null;
+        }
 
         if (o instanceof Map && !(o instanceof InternalMap)) {
             try {
@@ -950,7 +985,9 @@ public class DataType {
      * @throws ExecException if the type can't be forced to a Double.
      */
     public static Tuple toTuple(Object o) throws ExecException {
-        if (o == null) return null;
+        if (o == null) {
+            return null;
+        }
 
         if (o instanceof Tuple) {
             try {
@@ -978,7 +1015,9 @@ public class DataType {
      * @throws ExecException if the type can't be forced to a Double.
      */
     public static DataBag toBag(Object o) throws ExecException {
-        if (o == null) return null;
+        if (o == null) {
+            return null;
+        }
 
         if (o instanceof DataBag) {
             try {
@@ -1007,7 +1046,7 @@ public class DataType {
         }
         System.out.println(t.toString());
     }
-    
+
     /**
      * Determine if this type is a numeric type.
      * @param t type (as byte value) to test
@@ -1020,9 +1059,9 @@ public class DataType {
             case FLOAT:     return true ;
             case DOUBLE:    return true ;
             default: return false ;
-        }        
+        }
     }
-    
+
     /**
      * Determine if this is a type that can work can be done on.
      * @param t type (as a byte value) to test
@@ -1038,7 +1077,7 @@ public class DataType {
     }
 
     /**
-     * Merge types if possible.  Merging types means finding a type that one 
+     * Merge types if possible.  Merging types means finding a type that one
      * or both types can be upcast to.
      * @param type1
      * @param type2
@@ -1074,7 +1113,7 @@ public class DataType {
         // else return just ERROR
         return DataType.ERROR ;
     }
-    
+
     /**
      * Given a map, turn it into a String.
      * @param m map
@@ -1110,15 +1149,23 @@ public class DataType {
      * the same bytes.
      */
     public static boolean equalByteArrays(byte[] lhs, byte[] rhs) {
-        if(lhs == null && rhs == null) return true;
-        if(lhs == null || rhs == null) return false;
-        if(lhs.length != rhs.length) return false;
+        if(lhs == null && rhs == null) {
+            return true;
+        }
+        if(lhs == null || rhs == null) {
+            return false;
+        }
+        if(lhs.length != rhs.length) {
+            return false;
+        }
         for(int i = 0; i < lhs.length; ++i) {
-            if(lhs[i] != rhs[i]) return false;
+            if(lhs[i] != rhs[i]) {
+                return false;
+            }
         }
         return true;
     }
-       
+
 
     /**
      * Utility method that determines the schema from the passed in dataType.
@@ -1135,7 +1182,7 @@ public class DataType {
      * @throws FrontendException
      * @throws SchemaMergeException
      */
-    private static Schema.FieldSchema determineFieldSchema(byte dataType, Iterator fieldIter, 
+    private static Schema.FieldSchema determineFieldSchema(byte dataType, Iterator fieldIter,
             long fieldNum, Class klass ) throws ExecException, FrontendException, SchemaMergeException {
         switch (dataType) {
         case NULL:
@@ -1148,14 +1195,14 @@ public class DataType {
         case DOUBLE:
         case BYTEARRAY:
         case CHARARRAY:
-        case MAP: 
+        case MAP:
             return new Schema.FieldSchema(null, dataType);
         case TUPLE: {
             Schema schema = null;
             if(fieldNum != 0) {
                 schema = new Schema();
                 for(int i = 0; i < fieldNum; ++i) {
-                    schema.add(determineFieldSchema(klass.cast(fieldIter.next()))); 
+                    schema.add(determineFieldSchema(klass.cast(fieldIter.next())));
                 }
             }
             return new Schema.FieldSchema(null, schema, TUPLE);
@@ -1187,7 +1234,7 @@ public class DataType {
                         bagSchema.setTwoLevelAccessRequired(true);
                         return new Schema.FieldSchema(null, bagSchema, BAG);
                     }
-                    schema = Schema.mergeSchema(schema, currSchema, false, false, false); 
+                    schema = Schema.mergeSchema(schema, currSchema, false, false, false);
                 }
                 Schema.FieldSchema tupleFs = new Schema.FieldSchema(null, schema, TUPLE);
                 bagSchema = new Schema(tupleFs);
@@ -1203,21 +1250,21 @@ public class DataType {
             String msg = "Cannot determine field schema";
             throw new ExecException(msg, errCode, PigException.INPUT);
         }
-        
+
         }
     }
-    
+
     /***
      * Determine the field schema of an ResourceFieldSchema
      * @param rcFieldSchema the rcFieldSchema we want translated
      * @return the field schema corresponding to the object
      * @throws ExecException,FrontendException,SchemaMergeException
      */
-    public static Schema.FieldSchema determineFieldSchema(ResourceSchema.ResourceFieldSchema rcFieldSchema) 
+    public static Schema.FieldSchema determineFieldSchema(ResourceSchema.ResourceFieldSchema rcFieldSchema)
         throws ExecException, FrontendException, SchemaMergeException {
         byte dt = rcFieldSchema.getType();
         Iterator<ResourceSchema.ResourceFieldSchema> fieldIter = null;
-        long fieldNum = 0;        
+        long fieldNum = 0;
         if (dt == TUPLE || dt == BAG ) {
             fieldIter = Arrays.asList(rcFieldSchema.getSchema().getFields()).iterator();
             fieldNum = rcFieldSchema.getSchema().getFields().length;
@@ -1225,14 +1272,14 @@ public class DataType {
         return determineFieldSchema(dt, fieldIter, fieldNum, ResourceSchema.ResourceFieldSchema.class);
     }
 
-    
+
     /***
      * Determine the field schema of an object
      * @param o the object whose field schema is to be determined
      * @return the field schema corresponding to the object
      * @throws ExecException,FrontendException,SchemaMergeException
      */
-    public static Schema.FieldSchema determineFieldSchema(Object o) 
+    public static Schema.FieldSchema determineFieldSchema(Object o)
         throws ExecException, FrontendException, SchemaMergeException {
         byte dt = findType(o);
         Iterator fieldIter = null;