You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by zl...@apache.org on 2017/02/22 09:43:46 UTC

svn commit: r1783988 [9/24] - in /pig/branches/spark: ./ bin/ conf/ contrib/piggybank/java/ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/util/apachelo...

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java Wed Feb 22 09:43:41 2017
@@ -28,6 +28,7 @@ import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.pig.EvalFunc;
 import org.apache.pig.FuncSpec;
 import org.apache.pig.LoadCaster;
 import org.apache.pig.LoadFunc;
@@ -89,6 +90,8 @@ public class POCast extends ExpressionOp
                 caster = ((LoadFunc)obj).getLoadCaster();
             } else if (obj instanceof StreamToPig) {
                 caster = ((StreamToPig)obj).getLoadCaster();
+            } else if (obj instanceof EvalFunc) {
+                caster = ((EvalFunc)obj).getLoadCaster();
             } else {
                 throw new IOException("Invalid class type "
                         + funcSpec.getClassName());
@@ -165,7 +168,7 @@ public class POCast extends ExpressionOp
                         res.result = caster.bytesToBigInteger(dba.get());
                     } else {
                         int errCode = 1075;
-                        String msg = unknownByteArrayErrorMessage + "BigInteger.";
+                        String msg = unknownByteArrayErrorMessage + "BigInteger for " + this.getOriginalLocations();
                         throw new ExecException(msg, errCode, PigException.INPUT);
                     }
                 } catch (ExecException ee) {
@@ -281,7 +284,7 @@ public class POCast extends ExpressionOp
                         res.result = caster.bytesToBigDecimal(dba.get());
                     } else {
                         int errCode = 1075;
-                        String msg = unknownByteArrayErrorMessage + "BigDecimal.";
+                        String msg = unknownByteArrayErrorMessage + "BigDecimal for " + this.getOriginalLocations();
                         throw new ExecException(msg, errCode, PigException.INPUT);
                     }
                 } catch (ExecException ee) {
@@ -396,7 +399,7 @@ public class POCast extends ExpressionOp
                         res.result = caster.bytesToBoolean(dba.get());
                     } else {
                         int errCode = 1075;
-                        String msg = unknownByteArrayErrorMessage + "boolean.";
+                        String msg = unknownByteArrayErrorMessage + "boolean for " + this.getOriginalLocations();
                         throw new ExecException(msg, errCode, PigException.INPUT);
                     }
                 } catch (ExecException ee) {
@@ -510,7 +513,7 @@ public class POCast extends ExpressionOp
                         res.result = caster.bytesToInteger(dba.get());
                     } else {
                         int errCode = 1075;
-                        String msg = unknownByteArrayErrorMessage + "int.";
+                        String msg = unknownByteArrayErrorMessage + "int for " + this.getOriginalLocations();
                         throw new ExecException(msg, errCode, PigException.INPUT);
                     }
                 } catch (ExecException ee) {
@@ -636,7 +639,7 @@ public class POCast extends ExpressionOp
                         res.result = caster.bytesToLong(dba.get());
                     } else {
                         int errCode = 1075;
-                        String msg = unknownByteArrayErrorMessage + "long.";
+                        String msg = unknownByteArrayErrorMessage + "long for " + this.getOriginalLocations();
                         throw new ExecException(msg, errCode, PigException.INPUT);
                     }
                 } catch (ExecException ee) {
@@ -759,7 +762,7 @@ public class POCast extends ExpressionOp
                         res.result = caster.bytesToDouble(dba.get());
                     } else {
                         int errCode = 1075;
-                        String msg = unknownByteArrayErrorMessage + "double.";
+                        String msg = unknownByteArrayErrorMessage + "double for " + this.getOriginalLocations();
                         throw new ExecException(msg, errCode, PigException.INPUT);
                     }
                 } catch (ExecException ee) {
@@ -881,7 +884,7 @@ public class POCast extends ExpressionOp
                         res.result = caster.bytesToFloat(dba.get());
                     } else {
                         int errCode = 1075;
-                        String msg = unknownByteArrayErrorMessage + "float.";
+                        String msg = unknownByteArrayErrorMessage + "float for " + this.getOriginalLocations();
                         throw new ExecException(msg, errCode, PigException.INPUT);
                     }
                 } catch (ExecException ee) {
@@ -1007,7 +1010,7 @@ public class POCast extends ExpressionOp
                         res.result = caster.bytesToDateTime(dba.get());
                     } else {
                         int errCode = 1075;
-                        String msg = unknownByteArrayErrorMessage + "datetime.";
+                        String msg = unknownByteArrayErrorMessage + "datetime for " + this.getOriginalLocations();
                         throw new ExecException(msg, errCode, PigException.INPUT);
                     }
                 } catch (ExecException ee) {
@@ -1118,7 +1121,7 @@ public class POCast extends ExpressionOp
                         res.result = caster.bytesToCharArray(dba.get());
                     } else {
                         int errCode = 1075;
-                        String msg = unknownByteArrayErrorMessage + "string.";
+                        String msg = unknownByteArrayErrorMessage + "string for " + this.getOriginalLocations();
                         throw new ExecException(msg, errCode, PigException.INPUT);
                     }
                 } catch (ExecException ee) {
@@ -1270,7 +1273,7 @@ public class POCast extends ExpressionOp
                         res.result = caster.bytesToTuple(dba.get(), fieldSchema);
                     } else {
                         int errCode = 1075;
-                        String msg = unknownByteArrayErrorMessage + "tuple.";
+                        String msg = unknownByteArrayErrorMessage + "tuple for " + this.getOriginalLocations();
                         throw new ExecException(msg, errCode, PigException.INPUT);
                     }
                 } catch (ExecException ee) {
@@ -1332,7 +1335,7 @@ public class POCast extends ExpressionOp
                     result = caster.bytesToBag(((DataByteArray)obj).get(), fs);
                 } else {
                     int errCode = 1075;
-                    String msg = unknownByteArrayErrorMessage + "bag.";
+                    String msg = unknownByteArrayErrorMessage + "bag for " + this.getOriginalLocations();
                     throw new ExecException(msg, errCode, PigException.INPUT);
                 }
             } else {
@@ -1363,7 +1366,7 @@ public class POCast extends ExpressionOp
                     result = caster.bytesToTuple(((DataByteArray)obj).get(), fs);
                 } else {
                     int errCode = 1075;
-                    String msg = unknownByteArrayErrorMessage + "tuple.";
+                    String msg = unknownByteArrayErrorMessage + "tuple for " + this.getOriginalLocations();
                     throw new ExecException(msg, errCode, PigException.INPUT);
                 }
             } else {
@@ -1388,7 +1391,7 @@ public class POCast extends ExpressionOp
                     result = caster.bytesToMap(((DataByteArray)obj).get(), fs);
                 } else {
                     int errCode = 1075;
-                    String msg = unknownByteArrayErrorMessage + "tuple.";
+                    String msg = unknownByteArrayErrorMessage + "tuple for " + this.getOriginalLocations();
                     throw new ExecException(msg, errCode, PigException.INPUT);
                 }
             } else {
@@ -1402,7 +1405,7 @@ public class POCast extends ExpressionOp
                     result = caster.bytesToBoolean(((DataByteArray) obj).get());
                 } else {
                     int errCode = 1075;
-                    String msg = unknownByteArrayErrorMessage + "int.";
+                    String msg = unknownByteArrayErrorMessage + "int for " + this.getOriginalLocations();
                     throw new ExecException(msg, errCode, PigException.INPUT);
                 }
                 break;
@@ -1441,7 +1444,7 @@ public class POCast extends ExpressionOp
                     result = caster.bytesToInteger(((DataByteArray) obj).get());
                 } else {
                     int errCode = 1075;
-                    String msg = unknownByteArrayErrorMessage + "int.";
+                    String msg = unknownByteArrayErrorMessage + "int for " + this.getOriginalLocations();
                     throw new ExecException(msg, errCode, PigException.INPUT);
                 }
                 break;
@@ -1487,7 +1490,7 @@ public class POCast extends ExpressionOp
                     result = caster.bytesToDouble(((DataByteArray) obj).get());
                 } else {
                     int errCode = 1075;
-                    String msg = unknownByteArrayErrorMessage + "double.";
+                    String msg = unknownByteArrayErrorMessage + "double for " + this.getOriginalLocations();
                     throw new ExecException(msg, errCode, PigException.INPUT);
                 }
                 break;
@@ -1533,7 +1536,7 @@ public class POCast extends ExpressionOp
                     result = caster.bytesToLong(((DataByteArray)obj).get());
                 } else {
                     int errCode = 1075;
-                    String msg = unknownByteArrayErrorMessage + "long.";
+                    String msg = unknownByteArrayErrorMessage + "long for " + this.getOriginalLocations();
                     throw new ExecException(msg, errCode, PigException.INPUT);
                 }
                 break;
@@ -1579,7 +1582,7 @@ public class POCast extends ExpressionOp
                     result = caster.bytesToFloat(((DataByteArray)obj).get());
                 } else {
                     int errCode = 1075;
-                    String msg = unknownByteArrayErrorMessage + "float.";
+                    String msg = unknownByteArrayErrorMessage + "float for " + this.getOriginalLocations();
                     throw new ExecException(msg, errCode, PigException.INPUT);
                 }
                 break;
@@ -1625,7 +1628,7 @@ public class POCast extends ExpressionOp
                     result = caster.bytesToDateTime(((DataByteArray)obj).get());
                 } else {
                     int errCode = 1075;
-                    String msg = unknownByteArrayErrorMessage + "datetime.";
+                    String msg = unknownByteArrayErrorMessage + "datetime for " + this.getOriginalLocations();
                     throw new ExecException(msg, errCode, PigException.INPUT);
                 }
                 break;
@@ -1664,7 +1667,7 @@ public class POCast extends ExpressionOp
                     result = caster.bytesToCharArray(((DataByteArray)obj).get());
                 } else {
                     int errCode = 1075;
-                    String msg = unknownByteArrayErrorMessage + "float.";
+                    String msg = unknownByteArrayErrorMessage + "float for " + this.getOriginalLocations();
                     throw new ExecException(msg, errCode, PigException.INPUT);
                 }
                 break;
@@ -1712,7 +1715,7 @@ public class POCast extends ExpressionOp
                     result = caster.bytesToBigInteger(((DataByteArray)obj).get());
                 } else {
                     int errCode = 1075;
-                    String msg = unknownByteArrayErrorMessage + "BigInteger.";
+                    String msg = unknownByteArrayErrorMessage + "BigInteger for " + this.getOriginalLocations();
                     throw new ExecException(msg, errCode, PigException.INPUT);
                 }
                 break;
@@ -1757,7 +1760,7 @@ public class POCast extends ExpressionOp
                     result = caster.bytesToBigDecimal(((DataByteArray)obj).get());
                 } else {
                     int errCode = 1075;
-                    String msg = unknownByteArrayErrorMessage + "BigDecimal.";
+                    String msg = unknownByteArrayErrorMessage + "BigDecimal for " + this.getOriginalLocations();
                     throw new ExecException(msg, errCode, PigException.INPUT);
                 }
                 break;
@@ -1795,6 +1798,10 @@ public class POCast extends ExpressionOp
             default:
                 throw new ExecException("Cannot convert "+ obj + " to " + fs, 1120, PigException.INPUT);
             }
+        case DataType.BYTEARRAY:
+            //no-op (PIG-4933)
+            result = obj;
+            break;
         default:
             throw new ExecException("Don't know how to convert "+ obj + " to " + fs, 1120, PigException.INPUT);
         }
@@ -1861,7 +1868,7 @@ public class POCast extends ExpressionOp
                         res.result = caster.bytesToBag(dba.get(), fieldSchema);
                     } else {
                         int errCode = 1075;
-                        String msg = unknownByteArrayErrorMessage + "bag.";
+                        String msg = unknownByteArrayErrorMessage + "bag for " + this.getOriginalLocations();
                         throw new ExecException(msg, errCode, PigException.INPUT);
                     }
                 } catch (ExecException ee) {
@@ -1952,7 +1959,7 @@ public class POCast extends ExpressionOp
                         res.result = caster.bytesToMap(dba.get(), fieldSchema);
                     } else {
                         int errCode = 1075;
-                        String msg = unknownByteArrayErrorMessage + "map.";
+                        String msg = unknownByteArrayErrorMessage + "map for " + this.getOriginalLocations();
                         throw new ExecException(msg, errCode, PigException.INPUT);
                     }
                 } catch (ExecException ee) {

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java Wed Feb 22 09:43:41 2017
@@ -158,23 +158,19 @@ public class POProject extends Expressio
             illustratorMarkup(inpValue, res.result, -1);
             return res;
         } else if(columns.size() == 1) {
-            try {
+            if ( inpValue == null ) {
+                // the tuple is null, so a dereference should also produce a null
+                res.returnStatus = POStatus.STATUS_OK;
+                ret = null;
+            } else if( inpValue.size() > columns.get(0) ) {
                 ret = inpValue.get(columns.get(0));
-            } catch (IndexOutOfBoundsException ie) {
+            } else {
                 if(pigLogger != null) {
                     pigLogger.warn(this,"Attempt to access field " +
                             "which was not found in the input", PigWarning.ACCESSING_NON_EXISTENT_FIELD);
                 }
                 res.returnStatus = POStatus.STATUS_OK;
                 ret = null;
-            } catch (NullPointerException npe) {
-                // the tuple is null, so a dereference should also produce a null
-                // there is a slight danger here that the Tuple implementation
-                // may have given the exception for a different reason but if we
-                // don't catch it, we will die and the most common case for the
-                // exception would be because the tuple is null
-                res.returnStatus = POStatus.STATUS_OK;
-                ret = null;
             }
         } else if(isProjectToEnd){
             ret = getRangeTuple(inpValue);
@@ -215,23 +211,18 @@ public class POProject extends Expressio
      */
     private void addColumn(ArrayList<Object> objList, Tuple inpValue, int i)
     throws ExecException {
-        try {
+        if( inpValue == null ) {
+            // the tuple is null, so a dereference should also produce a null
+            objList.add(null);
+        } else if( inpValue.size() > i ) {
             objList.add(inpValue.get(i));
-        } catch (IndexOutOfBoundsException ie) {
+        } else {
             if(pigLogger != null) {
                 pigLogger.warn(this,"Attempt to access field " + i +
                         " which was not found in the input", PigWarning.ACCESSING_NON_EXISTENT_FIELD);
             }
             objList.add(null);
         }
-        catch (NullPointerException npe) {
-            // the tuple is null, so a dereference should also produce a null
-            // there is a slight danger here that the Tuple implementation
-            // may have given the exception for a different reason but if we
-            // don't catch it, we will die and the most common case for the
-            // exception would be because the tuple is null
-            objList.add(null);
-        }
     }
 
     @Override
@@ -406,21 +397,17 @@ public class POProject extends Expressio
             Object ret;
 
             if(columns.size() == 1) {
-                try{
+                if( inpValue == null ) {
+                    // the tuple is null, so a dereference should also produce a null
+                    ret = null;
+                } else if( inpValue.size() > columns.get(0) ) {
                     ret = inpValue.get(columns.get(0));
-                } catch (IndexOutOfBoundsException ie) {
+                } else {
                     if(pigLogger != null) {
                         pigLogger.warn(this,"Attempt to access field " +
                                 "which was not found in the input", PigWarning.ACCESSING_NON_EXISTENT_FIELD);
                     }
                     ret = null;
-                } catch (NullPointerException npe) {
-                    // the tuple is null, so a dereference should also produce a null
-                    // there is a slight danger here that the Tuple implementation
-                    // may have given the exception for a different reason but if we
-                    // don't catch it, we will die and the most common case for the
-                    // exception would be because the tuple is null
-                    ret = null;
                 }
             } else if(isProjectToEnd) {
                 ret = getRangeTuple(inpValue);
@@ -428,21 +415,17 @@ public class POProject extends Expressio
                 ArrayList<Object> objList = new ArrayList<Object>(columns.size());
 
                 for(int col: columns) {
-                    try {
+                    if( inpValue == null ) {
+                        // the tuple is null, so a dereference should also produce a null
+                        objList.add(null);
+                    } else if( inpValue.size() > col ) {
                         objList.add(inpValue.get(col));
-                    } catch (IndexOutOfBoundsException ie) {
+                    } else {
                         if(pigLogger != null) {
                             pigLogger.warn(this,"Attempt to access field " +
                                     "which was not found in the input", PigWarning.ACCESSING_NON_EXISTENT_FIELD);
                         }
                         objList.add(null);
-                    } catch (NullPointerException npe) {
-                        // the tuple is null, so a dereference should also produce a null
-                        // there is a slight danger here that the Tuple implementation
-                        // may have given the exception for a different reason but if we
-                        // don't catch it, we will die and the most common case for the
-                        // exception would be because the tuple is null
-                        objList.add(null);
                     }
                 }
                 ret = mTupleFactory.newTuple(objList);

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java Wed Feb 22 09:43:41 2017
@@ -49,7 +49,7 @@ public class CombinerPackager extends Pa
     private Map<Integer, Integer> keyLookup;
 
     private int numBags;
-    
+
     private transient boolean initialized;
     private transient boolean useDefaultBag;
 
@@ -77,6 +77,15 @@ public class CombinerPackager extends Pa
         }
     }
 
+    @Override
+    public void attachInput(Object key, DataBag[] bags, boolean[] readOnce)
+            throws ExecException {
+        this.key = key;
+        this.bags = bags;
+        this.readOnce = readOnce;
+        // Bag can be read directly and need not be materialized again
+    }
+
     /**
      * @param keyInfo the keyInfo to set
      */

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/LitePackager.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/LitePackager.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/LitePackager.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/LitePackager.java Wed Feb 22 09:43:41 2017
@@ -17,7 +17,7 @@
  */
 
 /**
- * 
+ *
  */
 package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
 
@@ -28,6 +28,7 @@ import java.util.Map;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.data.DataBag;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.io.NullableTuple;
 import org.apache.pig.impl.io.PigNullableWritable;
@@ -48,6 +49,15 @@ public class LitePackager extends Packag
     private PigNullableWritable keyWritable;
 
     @Override
+    public void attachInput(Object key, DataBag[] bags, boolean[] readOnce)
+            throws ExecException {
+        this.key = key;
+        this.bags = bags;
+        this.readOnce = readOnce;
+        // Bag can be read directly and need not be materialized again
+    }
+
+    @Override
     public boolean[] getInner() {
         return null;
     }

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCross.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCross.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCross.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCross.java Wed Feb 22 09:43:41 2017
@@ -256,4 +256,9 @@ public class POCross extends PhysicalOpe
         data = null;
     }
 
+    @Override
+    public void reset() {
+        clearMemory();
+    }
+
 }

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java Wed Feb 22 09:43:41 2017
@@ -97,7 +97,7 @@ public class POFRJoin extends PhysicalOp
 
     // The array of Hashtables one per replicated input. replicates[fragment] =
     // null fragment is the input which is fragmented and not replicated.
-    protected transient TupleToMapKey replicates[];
+    protected transient List<Map<? extends Object, ? extends List<Tuple>>> replicates;
     // varaible which denotes whether we are returning tuples from the foreach
     // operator
     protected transient boolean processingPlan;
@@ -234,7 +234,10 @@ public class POFRJoin extends PhysicalOp
         Result res = null;
         Result inp = null;
         if (!setUp) {
-            replicates = new TupleToMapKey[phyPlanLists.size()];
+            replicates = new ArrayList<Map<? extends Object, ? extends List<Tuple>>>(phyPlanLists.size());
+            for (int i = 0 ; i < phyPlanLists.size(); i++) {
+                replicates.add(null);
+            }
             dumTup = mTupleFactory.newTuple(1);
             setUpHashMap();
             setUp = true;
@@ -282,8 +285,7 @@ public class POFRJoin extends PhysicalOp
                 return new Result();
             }
             Tuple lrOutTuple = (Tuple) lrOut.result;
-            Tuple key = mTupleFactory.newTuple(1);
-            key.set(0, lrOutTuple.get(1));
+            Object key = lrOutTuple.get(1);
             Tuple value = getValueTuple(lr, lrOutTuple);
             lr.detachInput();
             // Configure the for each operator with the relevant bags
@@ -296,7 +298,7 @@ public class POFRJoin extends PhysicalOp
                     ce.setValue(value);
                     continue;
                 }
-                TupleToMapKey replicate = replicates[i];
+                Map<? extends Object, ? extends List<Tuple>> replicate = replicates.get(i);
                 if (replicate.get(key) == null) {
                     if (isLeftOuterJoin) {
                         ce.setValue(nullBag);
@@ -304,7 +306,7 @@ public class POFRJoin extends PhysicalOp
                     noMatch = true;
                     break;
                 }
-                ce.setValue(new NonSpillableDataBag(replicate.get(key).getList()));
+                ce.setValue(new NonSpillableDataBag(replicate.get(key)));
             }
 
             // If this is not LeftOuter Join and there was no match we
@@ -327,27 +329,28 @@ public class POFRJoin extends PhysicalOp
         }
     }
 
-    protected static class TupleToMapKey {
-        private HashMap<Tuple, TuplesToSchemaTupleList> tuples;
+    protected static class TupleToMapKey extends HashMap<Object, ArrayList<Tuple>> {
         private SchemaTupleFactory tf;
 
         public TupleToMapKey(int ct, SchemaTupleFactory tf) {
-            tuples = new HashMap<Tuple, TuplesToSchemaTupleList>(ct);
+            super(ct);
             this.tf = tf;
         }
 
-        public TuplesToSchemaTupleList put(Tuple key, TuplesToSchemaTupleList val) {
-            if (tf != null) {
-                key = TuplesToSchemaTupleList.convert(key, tf);
+        @Override
+        public TuplesToSchemaTupleList put(Object key, ArrayList<Tuple> val) {
+            if (tf != null && key instanceof Tuple) {
+                key = TuplesToSchemaTupleList.convert((Tuple)key, tf);
             }
-            return tuples.put(key, val);
+            return (TuplesToSchemaTupleList) super.put(key, val);
         }
 
-        public TuplesToSchemaTupleList get(Tuple key) {
-            if (tf != null) {
-                key = TuplesToSchemaTupleList.convert(key, tf);
+        @Override
+        public TuplesToSchemaTupleList get(Object key) {
+            if (tf != null && key instanceof Tuple) {
+                key = TuplesToSchemaTupleList.convert((Tuple)key, tf);
             }
-            return tuples.get(key);
+            return (TuplesToSchemaTupleList) super.get(key);
         }
     }
 
@@ -382,7 +385,7 @@ public class POFRJoin extends PhysicalOp
             SchemaTupleFactory keySchemaTupleFactory = keySchemaTupleFactories[i];
 
             if (i == fragment) {
-                replicates[i] = null;
+                replicates.set(i, null);
                 continue;
             }
 
@@ -401,25 +404,34 @@ public class POFRJoin extends PhysicalOp
             POLocalRearrange lr = LRs[i];
             lr.setInputs(Arrays.asList((PhysicalOperator) ld));
 
-            TupleToMapKey replicate = new TupleToMapKey(1000, keySchemaTupleFactory);
+            Map<Object, ArrayList<Tuple>> replicate;
+            if (keySchemaTupleFactory == null) {
+                replicate = new HashMap<Object, ArrayList<Tuple>>(1000);
+            } else {
+                replicate = new TupleToMapKey(1000, keySchemaTupleFactory);
+            }
 
             log.debug("Completed setup. Trying to build replication hash table");
             for (Result res = lr.getNextTuple(); res.returnStatus != POStatus.STATUS_EOP; res = lr.getNextTuple()) {
                 if (getReporter() != null)
                     getReporter().progress();
                 Tuple tuple = (Tuple) res.result;
-                if (isKeyNull(tuple.get(1))) continue;
-                Tuple key = mTupleFactory.newTuple(1);
-                key.set(0, tuple.get(1));
+                Object key = tuple.get(1);
+                if (isKeyNull(key)) continue;
                 Tuple value = getValueTuple(lr, tuple);
 
-                if (replicate.get(key) == null) {
-                    replicate.put(key, new TuplesToSchemaTupleList(1, inputSchemaTupleFactory));
+                ArrayList<Tuple> values = replicate.get(key);
+                if (values == null) {
+                    if (inputSchemaTupleFactory == null) {
+                        values = new ArrayList<Tuple>(1);
+                    } else {
+                        values = new TuplesToSchemaTupleList(1, inputSchemaTupleFactory);
+                    }
+                    replicate.put(key, values);
                 }
-
-                replicate.get(key).add(value);
+                values.add(value);
             }
-            replicates[i] = replicate;
+            replicates.set(i, replicate);
         }
         long time2 = System.currentTimeMillis();
         log.debug("Hash Table built. Time taken: " + (time2 - time1));

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoinSpark.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoinSpark.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoinSpark.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoinSpark.java Wed Feb 22 09:43:41 2017
@@ -51,7 +51,7 @@ public class POFRJoinSpark extends POFRJ
             addSchemaToFactories(keySchemas[i], keySchemaTupleFactories, i);
         }
 
-        replicates[fragment] = null;
+        replicates.set(fragment, null);
         int i = -1;
         long start = System.currentTimeMillis();
         for (int k = 0; k < inputSchemas.length; ++k) {
@@ -61,7 +61,7 @@ public class POFRJoinSpark extends POFRJ
             SchemaTupleFactory keySchemaTupleFactory = keySchemaTupleFactories[i];
 
             if (i == fragment) {
-                replicates[i] = null;
+                replicates.set(fragment, null);
                 continue;
             }
 
@@ -91,7 +91,7 @@ public class POFRJoinSpark extends POFRJ
                 replicate.get(key).add(value);
 
             }
-            replicates[i] = replicate;
+            replicates.set(i, replicate);
         }
         long end = System.currentTimeMillis();
         log.debug("Hash Table built. Time taken: " + (end - start));

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java Wed Feb 22 09:43:41 2017
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.pig.PigException;
 import org.apache.pig.backend.executionengine.ExecException;
@@ -55,6 +56,7 @@ import org.apache.pig.pen.util.LineageTr
 @SuppressWarnings("unchecked")
 public class POForEach extends PhysicalOperator {
     private static final long serialVersionUID = 1L;
+    private static final Result UNLIMITED_NULL_RESULT = new Result(POStatus.STATUS_OK, new UnlimitedNullTuple());
 
     protected List<PhysicalPlan> inputPlans;
 
@@ -264,7 +266,7 @@ public class POForEach extends PhysicalO
                 if (inp.returnStatus == POStatus.STATUS_EOP) {
                     if (parentPlan!=null && parentPlan.endOfAllInput && !endOfAllInputProcessed && endOfAllInputProcessing) {
                         // continue pull one more output
-                        inp = new Result(POStatus.STATUS_OK, new UnlimitedNullTuple());
+                        inp = UNLIMITED_NULL_RESULT;
                     } else {
                         return inp;
                     }
@@ -441,6 +443,8 @@ public class POForEach extends PhysicalO
 
                 if(inputData.result instanceof DataBag && isToBeFlattenedArray[i]) {
                     its[i] = ((DataBag)bags[i]).iterator();
+                } else if (inputData.result instanceof Map && isToBeFlattenedArray[i]) {
+                    its[i] = ((Map)bags[i]).entrySet().iterator();
                 } else {
                     its[i] = null;
                 }
@@ -466,7 +470,7 @@ public class POForEach extends PhysicalO
                 //we instantiate the template array and start populating it with data
                 data = new Object[noItems];
                 for(int i = 0; i < noItems; ++i) {
-                    if(isToBeFlattenedArray[i] && bags[i] instanceof DataBag) {
+                    if(isToBeFlattenedArray[i] && (bags[i] instanceof DataBag || bags[i] instanceof Map)) {
                         if(its[i].hasNext()) {
                             data[i] = its[i].next();
                         } else {
@@ -540,6 +544,15 @@ public class POForEach extends PhysicalO
                     out.append(t.get(j));
                 }
                 }
+            } else if (isToBeFlattenedArray[i] && in instanceof Map.Entry) {
+                Map.Entry entry = (Map.Entry)in;
+                if (knownSize) {
+                    out.set(idx++, entry.getKey());
+                    out.set(idx++, entry.getValue());
+                } else {
+                    out.append(entry.getKey());
+                    out.append(entry.getValue());
+                }
             } else {
                 if (knownSize) {
                     out.set(idx++, in);
@@ -738,9 +751,12 @@ public class POForEach extends PhysicalO
             opsToBeReset.add(sort);
         }
 
-        /* (non-Javadoc)
-         * @see org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor#visitProject(org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject)
-         */
+        @Override
+        public void visitCross(POCross c) throws VisitorException {
+            // FIXME: add only if limit is present
+            opsToBeReset.add(c);
+        }
+
         @Override
         public void visitProject(POProject proj) throws VisitorException {
             if(proj instanceof PORelationToExprProject) {

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java Wed Feb 22 09:43:41 2017
@@ -56,11 +56,11 @@ import org.apache.pig.impl.plan.VisitorE
 import org.apache.pig.impl.util.MultiMap;
 import org.apache.pig.newplan.logical.relational.LOJoin;
 
-/** This operator implements merge join algorithm to do map side joins. 
+/** This operator implements merge join algorithm to do map side joins.
  *  Currently, only two-way joins are supported. One input of join is identified as left
  *  and other is identified as right. Left input tuples are the input records in map.
  *  Right tuples are read from HDFS by opening right stream.
- *  
+ *
  *    This join doesn't support outer join.
  *    Data is assumed to be sorted in ascending order. It will fail if data is sorted in descending order.
  */
@@ -99,7 +99,7 @@ public class POMergeJoin extends Physica
     private FuncSpec rightLoaderFuncSpec;
 
     private String rightInputFileName;
-    
+
     private String indexFile;
 
     // Buffer to hold accumulated left tuples.
@@ -249,12 +249,11 @@ public class POMergeJoin extends Physica
      * from Tuple to SchemaTuple. This is necessary because we are not getting SchemaTuples
      * from the source, though in the future that is what we would like to do.
      */
-    public static class TuplesToSchemaTupleList {
-        private List<Tuple> tuples;
+    public static class TuplesToSchemaTupleList extends ArrayList<Tuple> {
         private SchemaTupleFactory tf;
 
         public TuplesToSchemaTupleList(int ct, TupleMaker<?> tf) {
-            tuples = new ArrayList<Tuple>(ct);
+            super(ct);
             if (tf instanceof SchemaTupleFactory) {
                 this.tf = (SchemaTupleFactory)tf;
             }
@@ -273,24 +272,24 @@ public class POMergeJoin extends Physica
             }
         }
 
+        @Override
         public boolean add(Tuple t) {
             if (tf != null) {
                 t = convert(t, tf);
             }
-            return tuples.add(t);
+            return super.add(t);
         }
 
+        @Override
         public Tuple get(int i) {
-            return tuples.get(i);
+            return super.get(i);
         }
 
+        @Override
         public int size() {
-            return tuples.size();
+            return super.size();
         }
 
-        public List<Tuple> getList() {
-            return tuples;
-        }
     }
 
     @SuppressWarnings("unchecked")
@@ -357,7 +356,7 @@ public class POMergeJoin extends Physica
                 }
                 else{
                     Object rightKey = extractKeysFromTuple(rightInp, 1);
-                    if(null == rightKey) // If we see tuple having null keys in stream, we drop them 
+                    if(null == rightKey) // If we see tuple having null keys in stream, we drop them
                         continue;       // and fetch next tuple.
 
                     int cmpval = ((Comparable)rightKey).compareTo(curJoinKey);
@@ -399,7 +398,7 @@ public class POMergeJoin extends Physica
                             "Last two tuples encountered were: \n"+
                         curJoiningRightTup+ "\n" + (Tuple)rightInp.result ;
                         throw new ExecException(errMsg,errCode);
-                    }    
+                    }
                 }
             }
         }
@@ -430,17 +429,17 @@ public class POMergeJoin extends Physica
                 prevLeftKey+ "\n" + curLeftKey ;
                 throw new ExecException(errMsg,errCode);
             }
- 
+
         case POStatus.STATUS_EOP:
             if(this.parentPlan.endOfAllInput || isEndOfInput()){
-                // We hit the end on left input. 
+                // We hit the end on left input.
                 // Tuples in bag may still possibly join with right side.
                 curJoinKey = prevLeftKey;
                 curLeftKey = null;
                 if (isEndOfInput()) {
                     leftInputConsumedInSpark = true;
                 }
-                break;                
+                break;
             }
             else    // Fetch next left input.
                 return curLeftInp;
@@ -465,7 +464,7 @@ public class POMergeJoin extends Physica
         // Accumulated tuples with same key on left side.
         // But since we are reading ahead we still haven't checked the read ahead right tuple.
         // Accumulated left tuples may potentially join with that. So, lets check that first.
-        
+
         if((null != prevRightKey) && prevRightKey.equals(prevLeftKey)){
 
             curJoiningRightTup = (Tuple)prevRightInp.result;
@@ -487,17 +486,17 @@ public class POMergeJoin extends Physica
                 slidingToNextRecord = false;
             } else
                 rightInp = getNextRightInp(prevLeftKey);
-                
+
             if(rightInp.returnStatus != POStatus.STATUS_OK)
                 return rightInp;
 
             Object extractedRightKey = extractKeysFromTuple(rightInp, 1);
-            
-            if(null == extractedRightKey) // If we see tuple having null keys in stream, we drop them 
+
+            if(null == extractedRightKey) // If we see tuple having null keys in stream, we drop them
                 continue;       // and fetch next tuple.
-            
+
             Comparable rightKey = (Comparable)extractedRightKey;
-            
+
             if( prevRightKey != null && rightKey.compareTo(prevRightKey) < 0){
                 // Sanity check.
                 int errCode = 1102;
@@ -528,7 +527,7 @@ public class POMergeJoin extends Physica
             else{    // We got ahead on right side. Store currently read right tuple.
                 prevRightKey = rightKey;
                 prevRightInp = rightInp;
-                // Since we didn't find any matching right tuple we throw away the buffered left tuples and add the one read in this function call. 
+                // Since we didn't find any matching right tuple we throw away the buffered left tuples and add the one read in this function call.
                 leftTuples = newLeftTupleArray();
                 leftTuples.add((Tuple)curLeftInp.result);
                 prevLeftInp = curLeftInp;
@@ -555,7 +554,7 @@ public class POMergeJoin extends Physica
             DefaultIndexableLoader loader = (DefaultIndexableLoader)rightLoader;
             loader.setIndexFile(indexFile);
         }
-        
+
         // Pass signature of the loader to rightLoader
         // make a copy of the conf to use in calls to rightLoader.
         rightLoader.setUDFContextSignature(signature);
@@ -608,11 +607,11 @@ public class POMergeJoin extends Physica
                         // run the tuple through the pipeline
                         rightPipelineRoot.attachInput(t);
                         return this.getNextRightInp();
-                        
+
                     }
                     default: // We don't deal with ERR/NULL. just pass them down
                         throwProcessingException(false, null);
-                        
+
                 }
             }
         } catch (IOException e) {
@@ -643,8 +642,8 @@ public class POMergeJoin extends Physica
             int errCode = 2167;
             String errMsg = "LocalRearrange used to extract keys from tuple isn't configured correctly";
             throw new ExecException(errMsg,errCode,PigException.BUG);
-        } 
-          
+        }
+
         return ((Tuple) lrOut.result).get(1);
     }
 
@@ -660,7 +659,7 @@ public class POMergeJoin extends Physica
             noInnerPlanOnRightSide = false;
             this.rightPipelineLeaf = rightPipeline.getLeaves().get(0);
             this.rightPipelineRoot = rightPipeline.getRoots().get(0);
-            this.rightPipelineRoot.setInputs(null);            
+            this.rightPipelineRoot.setInputs(null);
         }
         else
             noInnerPlanOnRightSide = true;
@@ -711,18 +710,18 @@ public class POMergeJoin extends Physica
     public boolean supportsMultipleOutputs() {
         return false;
     }
-    
+
     /**
      * @param rightInputFileName the rightInputFileName to set
      */
     public void setRightInputFileName(String rightInputFileName) {
         this.rightInputFileName = rightInputFileName;
     }
-    
+
     public String getSignature() {
         return signature;
     }
-    
+
     public void setSignature(String signature) {
         this.signature = signature;
     }
@@ -734,12 +733,12 @@ public class POMergeJoin extends Physica
     public String getIndexFile() {
         return indexFile;
     }
-    
+
     @Override
     public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
         return null;
     }
-    
+
     public LOJoin.JOINTYPE getJoinType() {
         return joinType;
     }

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPoissonSample.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPoissonSample.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPoissonSample.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPoissonSample.java Wed Feb 22 09:43:41 2017
@@ -44,6 +44,9 @@ public class POPoissonSample extends Phy
 
     private transient boolean initialized;
 
+    // num of rows skipped so far
+    private transient int numSkipped;
+
     // num of rows sampled so far
     private transient int numRowsSampled;
 
@@ -89,6 +92,7 @@ public class POPoissonSample extends Phy
     @Override
     public Result getNextTuple() throws ExecException {
         if (!initialized) {
+            numSkipped = 0;
             numRowsSampled = 0;
             avgTupleMemSz = 0;
             rowNum = 0;
@@ -134,7 +138,7 @@ public class POPoissonSample extends Phy
         }
 
         // skip tuples
-        for (long numSkipped  = 0; numSkipped < skipInterval; numSkipped++) {
+        while (numSkipped < skipInterval) {
             res = processInput();
             if (res.returnStatus == POStatus.STATUS_NULL) {
                 continue;
@@ -148,6 +152,7 @@ public class POPoissonSample extends Phy
                 return res;
             }
             rowNum++;
+            numSkipped++;
         }
 
         // skipped enough, get new sample
@@ -173,6 +178,8 @@ public class POPoissonSample extends Phy
 
             rowNum++;
             newSample = res;
+            // reset skipped
+            numSkipped = 0;
             return currentSample;
         }
     }

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java Wed Feb 22 09:43:41 2017
@@ -125,7 +125,7 @@ public class POReservoirSample extends P
                 }
 
                 // collect samples until input is exhausted
-                int rand = randGen.nextInt(rowProcessed);
+                int rand = randGen.nextInt(rowProcessed + 1);
                 if (rand < numSamples) {
                     samples[rand] = res;
                 }
@@ -133,8 +133,13 @@ public class POReservoirSample extends P
             }
         }
 
-        if (this.parentPlan.endOfAllInput && res.returnStatus == POStatus.STATUS_EOP) {
-            sampleCollectionDone = true;
+        if (res.returnStatus == POStatus.STATUS_EOP) {
+            if (this.parentPlan.endOfAllInput) {
+                sampleCollectionDone = true;
+            } else {
+                // In case of Split can get EOP in between.
+                return res;
+            }
         }
 
         return getSample();

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java Wed Feb 22 09:43:41 2017
@@ -51,13 +51,13 @@ public class Packager implements Illustr
     protected DataBag[] bags;
 
     public static enum PackageType {
-        GROUP, JOIN
+        GROUP, JOIN, BLOOMJOIN
     };
 
     protected transient Illustrator illustrator = null;
 
     // The key being worked on
-    Object key;
+    protected Object key;
 
     // marker to indicate if key is a tuple
     protected boolean isKeyTuple = false;
@@ -65,7 +65,7 @@ public class Packager implements Illustr
     protected boolean isKeyCompound = false;
 
     // key's type
-    byte keyType;
+    protected byte keyType;
 
     // The number of inputs to this
     // co-group. 0 indicates a distinct, which means there will only be a

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/StoreFuncDecorator.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/StoreFuncDecorator.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/StoreFuncDecorator.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/StoreFuncDecorator.java Wed Feb 22 09:43:41 2017
@@ -60,7 +60,7 @@ public class StoreFuncDecorator {
 
     private boolean allowErrors() {
         return UDFContext.getUDFContext().getJobConf()
-                .getBoolean(PigConfiguration.PIG_ALLOW_STORE_ERRORS, false);
+                .getBoolean(PigConfiguration.PIG_ERROR_HANDLING_ENABLED, false);
     }
 
     /**

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java Wed Feb 22 09:43:41 2017
@@ -162,8 +162,13 @@ public class LoadConverter implements RD
         private SparkEngineConf sparkEngineConf;
         private boolean initialized;
 
+        //LoadConverter#ToTupleFunction is executed more than once in multiquery case causing
+        //invalid number of input records, 'skip' flag below indicates first load is finished.
+        private boolean skip;
+
         public ToTupleFunction(SparkEngineConf sparkEngineConf){
                this.sparkEngineConf = sparkEngineConf;
+
         }
 
         @Override
@@ -172,9 +177,14 @@ public class LoadConverter implements RD
                 long partitionId = TaskContext.get().partitionId();
                 PigMapReduce.sJobConfInternal.get().set(PigConstants.TASK_INDEX, Long.toString(partitionId));
 
+                //We're in POSplit and already counted all input records,
+                //in a multiquery case skip will be set to true after the first load is finished:
+                if (sparkCounters != null && SparkPigStatusReporter.getInstance().getCounters().getCounter(counterGroupName, counterName).getValue() > 0) {
+                    skip=true;
+                }
                 initialized = true;
             }
-            if (sparkCounters != null && disableCounter == false) {
+            if (sparkCounters != null && disableCounter == false && skip == false) {
                 sparkCounters.increment(counterGroupName, counterName, 1L);
             }
             return v1._2();

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java Wed Feb 22 09:43:41 2017
@@ -19,13 +19,14 @@
 package org.apache.pig.backend.hadoop.executionengine.tez;
 
 import java.io.IOException;
-import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 import java.util.TreeMap;
 
@@ -43,6 +44,7 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager;
+import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
 import org.apache.hadoop.mapreduce.v2.util.MRApps;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
@@ -56,6 +58,7 @@ import org.apache.pig.backend.executione
 import org.apache.pig.backend.hadoop.HDataType;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.backend.hadoop.executionengine.JobCreationException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.DistinctCombiner;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.InputSizeReducerEstimator;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigSecondaryKeyGroupComparator;
@@ -87,7 +90,6 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
-import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezEdgeDescriptor;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOpPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan;
@@ -108,7 +110,6 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.data.DataType;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.PigImplConstants;
-import org.apache.pig.impl.builtin.DefaultIndexableLoader;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.io.NullablePartitionWritable;
 import org.apache.pig.impl.io.NullableTuple;
@@ -174,6 +175,7 @@ public class TezDagBuilder extends TezOp
     private PigContext pc;
     private Configuration globalConf;
     private Configuration pigContextConf;
+    private Configuration shuffleVertexManagerBaseConf;
     private FileSystem fs;
     private long intermediateTaskInputSize;
     private Set<String> inputSplitInDiskVertices;
@@ -191,6 +193,8 @@ public class TezDagBuilder extends TezOp
     private String mapTaskLaunchCmdOpts;
     private String reduceTaskLaunchCmdOpts;
 
+    private boolean disableDAGRecovery = false;
+
     public TezDagBuilder(PigContext pc, TezOperPlan plan, DAG dag,
             Map<String, LocalResource> localResources) {
         super(plan, new DependencyOrderWalker<TezOperator, TezOperPlan>(plan));
@@ -210,6 +214,10 @@ public class TezDagBuilder extends TezOp
         }
     }
 
+    public boolean shouldDisableDAGRecovery() {
+        return disableDAGRecovery;
+    }
+
     private void initialize(PigContext pc) throws IOException {
 
         this.globalConf = ConfigurationUtil.toConfiguration(pc.getProperties(), true);
@@ -217,6 +225,16 @@ public class TezDagBuilder extends TezOp
         this.pigContextConf = ConfigurationUtil.toConfiguration(pc.getProperties(), false);
         MRToTezHelper.processMRSettings(pigContextConf, globalConf);
 
+        shuffleVertexManagerBaseConf = new Configuration(false);
+        // Only copy tez.shuffle-vertex-manager config to keep payload size small
+        Iterator<Entry<String, String>> iter = pigContextConf.iterator();
+        while (iter.hasNext()) {
+            Entry<String, String> entry = iter.next();
+            if (entry.getKey().startsWith("tez.shuffle-vertex-manager")) {
+                shuffleVertexManagerBaseConf.set(entry.getKey(), entry.getValue());
+            }
+        }
+
         // Add credentials from binary token file and get tokens for namenodes
         // specified in mapreduce.job.hdfs-servers
         SecurityHelper.populateTokenCache(globalConf, dag.getCredentials());
@@ -265,7 +283,7 @@ public class TezDagBuilder extends TezOp
         if (globalConf.get(TezConfiguration.TEZ_TASK_LAUNCH_CMD_OPTS) == null) {
             // If tez setting is not defined
             MRHelpers.updateEnvBasedOnMRTaskEnv(globalConf, mapTaskEnv, true);
-            MRHelpers.updateEnvBasedOnMRTaskEnv(globalConf, reduceTaskEnv, true);
+            MRHelpers.updateEnvBasedOnMRTaskEnv(globalConf, reduceTaskEnv, false);
         }
 
         if (globalConf.get(TezConfiguration.TEZ_TASK_LAUNCH_CMD_OPTS) != null) {
@@ -279,7 +297,7 @@ public class TezDagBuilder extends TezOp
 
         try {
             fs = FileSystem.get(globalConf);
-            intermediateTaskInputSize = HadoopShims.getDefaultBlockSize(fs, FileLocalizer.getTemporaryResourcePath(pc));
+            intermediateTaskInputSize = fs.getDefaultBlockSize(FileLocalizer.getTemporaryResourcePath(pc));
         } catch (Exception e) {
             log.warn("Unable to get the block size for temporary directory, defaulting to 128MB", e);
             intermediateTaskInputSize = 134217728L;
@@ -397,7 +415,11 @@ public class TezDagBuilder extends TezOp
                 tezOp.getVertexGroupInfo().setVertexGroup(vertexGroup);
                 POStore store = tezOp.getVertexGroupInfo().getStore();
                 if (store != null) {
-                    vertexGroup.addDataSink(store.getOperatorKey().toString(),
+                    String outputKey = store.getOperatorKey().toString();
+                    if (store instanceof POStoreTez) {
+                        outputKey = ((POStoreTez) store).getOutputKey();
+                    }
+                    vertexGroup.addDataSink(outputKey,
                             DataSinkDescriptor.create(tezOp.getVertexGroupInfo().getStoreOutputDescriptor(),
                             OutputCommitterDescriptor.create(MROutputCommitter.class.getName()), dag.getCredentials()));
                 }
@@ -441,7 +463,14 @@ public class TezDagBuilder extends TezOp
 
         Configuration conf = new Configuration(pigContextConf);
 
-        if (!combinePlan.isEmpty()) {
+        if (edge.needsDistinctCombiner()) {
+            conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_COMBINER_CLASS,
+                    MRCombiner.class.getName());
+            conf.set(MRJobConfig.COMBINE_CLASS_ATTR,
+                    DistinctCombiner.Combine.class.getName());
+            log.info("Setting distinct combiner class between "
+                    + from.getOperatorKey() + " and " + to.getOperatorKey());
+        } else if (!combinePlan.isEmpty()) {
             udfContextSeparator.serializeUDFContextForEdge(conf, from, to, UDFType.USERFUNC);
             addCombiner(combinePlan, to, conf, isMergedInput);
         }
@@ -450,7 +479,7 @@ public class TezDagBuilder extends TezOp
                 POLocalRearrangeTez.class);
 
         for (POLocalRearrangeTez lr : lrs) {
-            if (lr.getOutputKey().equals(to.getOperatorKey().toString())) {
+            if (lr.containsOutputKey(to.getOperatorKey().toString())) {
                 byte keyType = lr.getKeyType();
                 setIntermediateOutputKeyValue(keyType, conf, to, lr.isConnectedToPackage(), isMergedInput);
                 // In case of secondary key sort, main key type is the actual key type
@@ -479,7 +508,8 @@ public class TezDagBuilder extends TezOp
 
         conf.setBoolean(MRConfiguration.MAPPER_NEW_API, true);
         conf.setBoolean(MRConfiguration.REDUCER_NEW_API, true);
-        conf.set("pig.pigContext", serializedPigContext);
+        conf.setBoolean(PigImplConstants.PIG_EXECTYPE_MODE_LOCAL, pc.getExecType().isLocal());
+        conf.set(PigImplConstants.PIG_LOG4J_PROPERTIES, ObjectSerializer.serialize(pc.getLog4jProperties()));
         conf.set("udf.import.list", serializedUDFImportList);
 
         if(to.isGlobalSort() || to.isLimitAfterSort()){
@@ -510,26 +540,36 @@ public class TezDagBuilder extends TezOp
 
         UserPayload payLoad = TezUtils.createUserPayloadFromConf(conf);
         out.setUserPayload(payLoad);
+        in.setUserPayload(payLoad);
 
+        // Remove combiner and reset payload
         if (!combinePlan.isEmpty()) {
             boolean noCombineInReducer = false;
+            boolean noCombineInMapper = edge.getCombinerInMap() == null ? false : !edge.getCombinerInMap();
             String reducerNoCombiner = globalConf.get(PigConfiguration.PIG_EXEC_NO_COMBINER_REDUCER);
-            if (reducerNoCombiner == null || reducerNoCombiner.equals("auto")) {
+            if (edge.getCombinerInReducer() != null) {
+                noCombineInReducer = !edge.getCombinerInReducer();
+            } else if (reducerNoCombiner == null || reducerNoCombiner.equals("auto")) {
                 noCombineInReducer = TezCompilerUtil.bagDataTypeInCombinePlan(combinePlan);
             } else {
                 noCombineInReducer = Boolean.parseBoolean(reducerNoCombiner);
             }
-            if (noCombineInReducer) {
+            if (noCombineInReducer || noCombineInMapper) {
                 log.info("Turning off combiner in reducer vertex " + to.getOperatorKey() + " for edge from " + from.getOperatorKey());
                 conf.unset(TezRuntimeConfiguration.TEZ_RUNTIME_COMBINER_CLASS);
                 conf.unset(MRJobConfig.COMBINE_CLASS_ATTR);
                 conf.unset("pig.combinePlan");
                 conf.unset("pig.combine.package");
                 conf.unset("pig.map.keytype");
-                payLoad = TezUtils.createUserPayloadFromConf(conf);
+                UserPayload payLoadWithoutCombiner = TezUtils.createUserPayloadFromConf(conf);
+                if (noCombineInMapper) {
+                    out.setUserPayload(payLoadWithoutCombiner);
+                }
+                if (noCombineInReducer) {
+                    in.setUserPayload(payLoadWithoutCombiner);
+                }
             }
         }
-        in.setUserPayload(payLoad);
 
         if (edge.dataMovementType!=DataMovementType.BROADCAST && to.getEstimatedParallelism()!=-1 && to.getVertexParallelism()==-1 && (to.isGlobalSort()||to.isSkewedJoin())) {
             // Use custom edge
@@ -593,6 +633,8 @@ public class TezDagBuilder extends TezOp
         setOutputFormat(job);
         payloadConf.set("udf.import.list", serializedUDFImportList);
         payloadConf.set("exectype", "TEZ");
+        payloadConf.setBoolean(PigImplConstants.PIG_EXECTYPE_MODE_LOCAL, pc.getExecType().isLocal());
+        payloadConf.set(PigImplConstants.PIG_LOG4J_PROPERTIES, ObjectSerializer.serialize(pc.getLog4jProperties()));
 
         // Process stores
         LinkedList<POStore> stores = processStores(tezOp, payloadConf, job);
@@ -611,11 +653,7 @@ public class TezDagBuilder extends TezOp
             payloadConf.set(PigInputFormat.PIG_INPUT_SIGNATURES, ObjectSerializer.serialize(tezOp.getLoaderInfo().getInpSignatureLists()));
             payloadConf.set(PigInputFormat.PIG_INPUT_LIMITS, ObjectSerializer.serialize(tezOp.getLoaderInfo().getInpLimits()));
             inputPayLoad = new Configuration(payloadConf);
-            if (tezOp.getLoaderInfo().getLoads().get(0).getLoadFunc() instanceof DefaultIndexableLoader) {
-                inputPayLoad.set("pig.pigContext", serializedPigContext);
-            }
         }
-        payloadConf.set("pig.pigContext", serializedPigContext);
 
         if (tezOp.getSampleOperator() != null) {
             payloadConf.set(PigProcessor.SAMPLE_VERTEX, tezOp.getSampleOperator().getOperatorKey().toString());
@@ -689,7 +727,7 @@ public class TezDagBuilder extends TezOp
                             PlanHelper.getPhysicalOperators(pred.plan, POLocalRearrangeTez.class);
                     for (POLocalRearrangeTez lr : lrs) {
                         if (lr.isConnectedToPackage()
-                                && lr.getOutputKey().equals(tezOp.getOperatorKey().toString())) {
+                                && lr.containsOutputKey(tezOp.getOperatorKey().toString())) {
                             localRearrangeMap.put((int) lr.getIndex(), inputKey);
                             if (isVertexGroup) {
                                 isMergedInput = true;
@@ -772,9 +810,25 @@ public class TezDagBuilder extends TezOp
 
         String vmPluginName = null;
         Configuration vmPluginConf = null;
+        boolean containScatterGather = false;
+        boolean containCustomPartitioner = false;
+        for (TezEdgeDescriptor edge : tezOp.inEdges.values()) {
+            if (edge.dataMovementType == DataMovementType.SCATTER_GATHER) {
+                containScatterGather = true;
+            }
+            if (edge.partitionerClass != null) {
+                containCustomPartitioner = true;
+            }
+        }
+
+        if(containScatterGather) {
+            vmPluginName = ShuffleVertexManager.class.getName();
+            vmPluginConf = new Configuration(shuffleVertexManagerBaseConf);
+        }
 
         // Set the right VertexManagerPlugin
         if (tezOp.getEstimatedParallelism() != -1) {
+            boolean autoParallelism = false;
             if (tezOp.isGlobalSort()||tezOp.isSkewedJoin()) {
                 if (tezOp.getVertexParallelism()==-1 && (
                         tezOp.isGlobalSort() &&getPlan().getPredecessors(tezOp).size()==1||
@@ -783,33 +837,12 @@ public class TezDagBuilder extends TezOp
                     // to decrease/increase parallelism of sorting vertex dynamically
                     // based on the numQuantiles calculated by sample aggregation vertex
                     vmPluginName = PartitionerDefinedVertexManager.class.getName();
+                    autoParallelism = true;
                     log.info("Set VertexManagerPlugin to PartitionerDefinedParallelismVertexManager for vertex " + tezOp.getOperatorKey().toString());
                 }
             } else {
-                boolean containScatterGather = false;
-                boolean containCustomPartitioner = false;
-                for (TezEdgeDescriptor edge : tezOp.inEdges.values()) {
-                    if (edge.dataMovementType == DataMovementType.SCATTER_GATHER) {
-                        containScatterGather = true;
-                    }
-                    if (edge.partitionerClass!=null) {
-                        containCustomPartitioner = true;
-                    }
-                }
                 if (containScatterGather && !containCustomPartitioner) {
-                    vmPluginConf = (vmPluginConf == null) ? new Configuration(pigContextConf) : vmPluginConf;
-                    // Use auto-parallelism feature of ShuffleVertexManager to dynamically
-                    // reduce the parallelism of the vertex
-                    if (payloadConf.getBoolean(PigConfiguration.PIG_TEZ_GRACE_PARALLELISM, true)
-                            && !TezOperPlan.getGrandParentsForGraceParallelism(getPlan(), tezOp).isEmpty()) {
-                        vmPluginName = PigGraceShuffleVertexManager.class.getName();
-                        tezOp.setUseGraceParallelism(true);
-                        vmPluginConf.set("pig.tez.plan", getSerializedTezPlan());
-                        vmPluginConf.set("pig.pigContext", serializedPigContext);
-                    } else {
-                        vmPluginName = ShuffleVertexManager.class.getName();
-                    }
-                    vmPluginConf.setBoolean(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL, true);
+
                     // For Intermediate reduce, set the bytes per reducer to be block size.
                     long bytesPerReducer = intermediateTaskInputSize;
                     // If there are store statements, use BYTES_PER_REDUCER_PARAM configured by user.
@@ -818,8 +851,8 @@ public class TezDagBuilder extends TezOp
                     // In Tez, numReducers=(map output size/bytesPerReducer) we need lower values to avoid skews in reduce
                     // as map input sizes are mostly always high compared to map output.
                     if (stores.size() > 0) {
-                        if (vmPluginConf.get(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM) != null) {
-                            bytesPerReducer = vmPluginConf.getLong(
+                        if (pigContextConf.get(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM) != null) {
+                            bytesPerReducer = pigContextConf.getLong(
                                             InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
                                             InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER);
                         } else if (tezOp.isGroupBy()) {
@@ -828,10 +861,28 @@ public class TezDagBuilder extends TezOp
                             bytesPerReducer = SHUFFLE_BYTES_PER_REDUCER_DEFAULT;
                         }
                     }
+
+                    // Use auto-parallelism feature of ShuffleVertexManager to dynamically
+                    // reduce the parallelism of the vertex. Use PigGraceShuffleVertexManager
+                    // instead of ShuffleVertexManager if pig.tez.grace.parallelism is turned on
+                    if (payloadConf.getBoolean(PigConfiguration.PIG_TEZ_GRACE_PARALLELISM, true)
+                            && !TezOperPlan.getGrandParentsForGraceParallelism(getPlan(), tezOp).isEmpty()
+                            && tezOp.getCrossKeys() == null) {
+                        vmPluginName = PigGraceShuffleVertexManager.class.getName();
+                        tezOp.setUseGraceParallelism(true);
+                        vmPluginConf.set("pig.tez.plan", getSerializedTezPlan());
+                        vmPluginConf.set(PigImplConstants.PIG_CONTEXT, serializedPigContext);
+                        vmPluginConf.setLong(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, bytesPerReducer);
+                    }
+                    vmPluginConf.setBoolean(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL, true);
                     vmPluginConf.setLong(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE, bytesPerReducer);
+                    autoParallelism = true;
                     log.info("Set auto parallelism for vertex " + tezOp.getOperatorKey().toString());
                 }
             }
+            if (globalConf.getBoolean(PigConfiguration.PIG_TEZ_AUTO_PARALLELISM_DISABLE_DAG_RECOVERY, false) && autoParallelism) {
+                disableDAGRecovery = true;
+            }
         }
         if (tezOp.isLimit() && (vmPluginName == null || vmPluginName.equals(PigGraceShuffleVertexManager.class.getName())||
                 vmPluginName.equals(ShuffleVertexManager.class.getName()))) {
@@ -1409,22 +1460,12 @@ public class TezDagBuilder extends TezOp
 
     private void setOutputFormat(org.apache.hadoop.mapreduce.Job job) {
         // the OutputFormat we report to Hadoop is always PigOutputFormat which
-        // can be wrapped with LazyOutputFormat provided if it is supported by
-        // the Hadoop version and PigConfiguration.PIG_OUTPUT_LAZY is set
+        // can be wrapped with LazyOutputFormat provided if PigConfiguration.PIG_OUTPUT_LAZY is set
         if ("true".equalsIgnoreCase(job.getConfiguration().get(PigConfiguration.PIG_OUTPUT_LAZY))) {
-            try {
-                Class<?> clazz = PigContext
-                        .resolveClassName("org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat");
-                Method method = clazz.getMethod("setOutputFormatClass",
-                        org.apache.hadoop.mapreduce.Job.class, Class.class);
-                method.invoke(null, job, PigOutputFormatTez.class);
-            } catch (Exception e) {
-                job.setOutputFormatClass(PigOutputFormatTez.class);
-                log.warn(PigConfiguration.PIG_OUTPUT_LAZY
-                        + " is set but LazyOutputFormat couldn't be loaded. Default PigOutputFormat will be used");
-            }
+            LazyOutputFormat.setOutputFormatClass(job,PigOutputFormatTez.class);
         } else {
             job.setOutputFormatClass(PigOutputFormatTez.class);
         }
     }
+
 }

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java Wed Feb 22 09:43:41 2017
@@ -30,6 +30,11 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.pig.PigConfiguration;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOpPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperator;
+import org.apache.pig.impl.plan.DependencyOrderWalker;
+import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.util.UDFContext;
 import org.apache.pig.tools.pigstats.tez.TezPigScriptStats;
 import org.apache.tez.client.TezClient;
@@ -51,7 +56,7 @@ import com.google.common.collect.Maps;
  */
 public class TezJob implements Runnable {
     private static final Log log = LogFactory.getLog(TezJob.class);
-    private Configuration conf;
+    private TezConfiguration conf;
     private EnumSet<StatusGetOpts> statusGetOpts;
     private Map<String, LocalResource> requestAMResources;
     private ApplicationId appId;
@@ -69,31 +74,71 @@ public class TezJob implements Runnable
 
     public TezJob(TezConfiguration conf, DAG dag,
             Map<String, LocalResource> requestAMResources,
-            int estimatedTotalParallelism) throws IOException {
+            TezOperPlan tezPlan) throws IOException {
         this.conf = conf;
         this.dag = dag;
         this.requestAMResources = requestAMResources;
         this.reuseSession = conf.getBoolean(PigConfiguration.PIG_TEZ_SESSION_REUSE, true);
         this.statusGetOpts = EnumSet.of(StatusGetOpts.GET_COUNTERS);
-        tezJobConf = new TezJobConfig(estimatedTotalParallelism);
+        tezJobConf = new TezJobConfig(tezPlan);
     }
 
     static class TezJobConfig {
 
         private int estimatedTotalParallelism = -1;
+        private int maxOutputsinSingleVertex;
+        private int totalVertices  = 0;
 
-        public TezJobConfig(int estimatedTotalParallelism) {
-            this.estimatedTotalParallelism = estimatedTotalParallelism;
+        public TezJobConfig(TezOperPlan tezPlan) throws VisitorException {
+            this.estimatedTotalParallelism = tezPlan.getEstimatedTotalParallelism();
+            MaxOutputsFinder finder = new MaxOutputsFinder(tezPlan);
+            finder.visit();
+            this.maxOutputsinSingleVertex = finder.getMaxOutputsinSingleVertex();
+            this.totalVertices = finder.getTotalVertices();
         }
 
         public int getEstimatedTotalParallelism() {
             return estimatedTotalParallelism;
         }
 
-        public void setEstimatedTotalParallelism(int estimatedTotalParallelism) {
-            this.estimatedTotalParallelism = estimatedTotalParallelism;
+        public int getMaxOutputsinSingleVertex() {
+            return maxOutputsinSingleVertex;
         }
 
+        public int getTotalVertices() {
+            return totalVertices;
+        }
+
+    }
+
+    private static class MaxOutputsFinder extends TezOpPlanVisitor {
+
+        private int maxOutputsinSingleVertex  = 1;
+        private int totalVertices  = 0;
+
+        public MaxOutputsFinder(TezOperPlan plan) {
+            super(plan, new DependencyOrderWalker<TezOperator, TezOperPlan>(plan));
+        }
+
+        public int getMaxOutputsinSingleVertex() {
+            return maxOutputsinSingleVertex;
+        }
+
+        public int getTotalVertices() {
+            return totalVertices;
+        }
+
+        @Override
+        public void visitTezOp(TezOperator tezOperator) throws VisitorException {
+            if (!tezOperator.isVertexGroup()) {
+                totalVertices++;
+                int outputs = tezOperator.outEdges.keySet().size();
+                maxOutputsinSingleVertex = maxOutputsinSingleVertex > outputs ? maxOutputsinSingleVertex : outputs;
+            }
+        }
+
+
+
     }
 
     public DAG getDAG() {

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java Wed Feb 22 09:43:41 2017
@@ -19,6 +19,7 @@ package org.apache.pig.backend.hadoop.ex
 
 import java.io.File;
 import java.io.IOException;
+import java.lang.reflect.Method;
 import java.net.URI;
 import java.util.HashMap;
 import java.util.Map;
@@ -30,6 +31,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.pig.PigException;
+import org.apache.pig.backend.hadoop.PigATSClient;
 import org.apache.pig.backend.hadoop.executionengine.JobCreationException;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPlanContainer;
@@ -50,11 +52,12 @@ public class TezJobCompiler {
     private static final Log log = LogFactory.getLog(TezJobCompiler.class);
 
     private PigContext pigContext;
-    private TezConfiguration tezConf;
+    private Configuration conf;
+    private boolean disableDAGRecovery;
 
     public TezJobCompiler(PigContext pigContext, Configuration conf) throws IOException {
         this.pigContext = pigContext;
-        this.tezConf = new TezConfiguration(conf);
+        this.conf = conf;
     }
 
     public DAG buildDAG(TezPlanContainerNode tezPlanNode, Map<String, LocalResource> localResources)
@@ -64,6 +67,7 @@ public class TezJobCompiler {
         TezDagBuilder dagBuilder = new TezDagBuilder(pigContext, tezPlanNode.getTezOperPlan(), tezDag, localResources);
         dagBuilder.visit();
         dagBuilder.avoidContainerReuseIfInputSplitInDisk();
+        disableDAGRecovery = dagBuilder.shouldDisableDAGRecovery();
         return tezDag;
     }
 
@@ -85,6 +89,7 @@ public class TezJobCompiler {
         return job;
     }
 
+    @SuppressWarnings({ "rawtypes", "unchecked" })
     private TezJob getJob(TezPlanContainerNode tezPlanNode, TezPlanContainer planContainer)
             throws JobCreationException {
         try {
@@ -107,8 +112,34 @@ public class TezJobCompiler {
             }
             DAG tezDag = buildDAG(tezPlanNode, localResources);
             tezDag.setDAGInfo(createDagInfo(TezScriptState.get().getScript()));
+            // set Tez caller context
+            // Reflection for the following code since it is only available since tez 0.8.1:
+            // CallerContext context = CallerContext.create(ATSService.CallerContext, ATSService.getPigAuditId(pigContext),
+            //     ATSService.EntityType, "");
+            // tezDag.setCallerContext(context);
+            Class callerContextClass = null;
+            try {
+                callerContextClass = Class.forName("org.apache.tez.client.CallerContext");
+            } catch (ClassNotFoundException e) {
+                // If pre-Tez 0.8.1, skip setting CallerContext
+            }
+            if (callerContextClass != null) {
+                Method builderBuildMethod = callerContextClass.getMethod("create", String.class,
+                        String.class, String.class, String.class);
+                Object context = builderBuildMethod.invoke(null, PigATSClient.CALLER_CONTEXT,
+                        PigATSClient.getPigAuditId(pigContext), PigATSClient.ENTITY_TYPE, "");
+                Method dagSetCallerContext = tezDag.getClass().getMethod("setCallerContext",
+                        context.getClass());
+                dagSetCallerContext.invoke(tezDag, context);
+            }
             log.info("Total estimated parallelism is " + tezPlan.getEstimatedTotalParallelism());
-            return new TezJob(tezConf, tezDag, localResources, tezPlan.getEstimatedTotalParallelism());
+            TezConfiguration tezConf = new TezConfiguration(conf);
+            if (disableDAGRecovery
+                    && tezConf.getBoolean(TezConfiguration.DAG_RECOVERY_ENABLED,
+                            TezConfiguration.DAG_RECOVERY_ENABLED_DEFAULT)) {
+                tezConf.setBoolean(TezConfiguration.DAG_RECOVERY_ENABLED, false);
+            }
+            return new TezJob(tezConf, tezDag, localResources, tezPlan);
         } catch (Exception e) {
             int errCode = 2017;
             String msg = "Internal error creating job configuration.";