You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by zl...@apache.org on 2017/02/24 03:34:40 UTC

svn commit: r1784224 [5/17] - in /pig/branches/spark: ./ bin/ conf/ contrib/piggybank/java/ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/util/apachelo...

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java Fri Feb 24 03:34:37 2017
@@ -97,7 +97,7 @@ public class POFRJoin extends PhysicalOp
 
     // The array of Hashtables one per replicated input. replicates[fragment] =
     // null fragment is the input which is fragmented and not replicated.
-    protected transient List<Map<? extends Object, ? extends List<Tuple>>> replicates;
+    protected transient TupleToMapKey replicates[];
     // varaible which denotes whether we are returning tuples from the foreach
     // operator
     protected transient boolean processingPlan;
@@ -234,10 +234,7 @@ public class POFRJoin extends PhysicalOp
         Result res = null;
         Result inp = null;
         if (!setUp) {
-            replicates = new ArrayList<Map<? extends Object, ? extends List<Tuple>>>(phyPlanLists.size());
-            for (int i = 0 ; i < phyPlanLists.size(); i++) {
-                replicates.add(null);
-            }
+            replicates = new TupleToMapKey[phyPlanLists.size()];
             dumTup = mTupleFactory.newTuple(1);
             setUpHashMap();
             setUp = true;
@@ -285,7 +282,8 @@ public class POFRJoin extends PhysicalOp
                 return new Result();
             }
             Tuple lrOutTuple = (Tuple) lrOut.result;
-            Object key = lrOutTuple.get(1);
+            Tuple key = mTupleFactory.newTuple(1);
+            key.set(0, lrOutTuple.get(1));
             Tuple value = getValueTuple(lr, lrOutTuple);
             lr.detachInput();
             // Configure the for each operator with the relevant bags
@@ -298,7 +296,7 @@ public class POFRJoin extends PhysicalOp
                     ce.setValue(value);
                     continue;
                 }
-                Map<? extends Object, ? extends List<Tuple>> replicate = replicates.get(i);
+                TupleToMapKey replicate = replicates[i];
                 if (replicate.get(key) == null) {
                     if (isLeftOuterJoin) {
                         ce.setValue(nullBag);
@@ -306,7 +304,7 @@ public class POFRJoin extends PhysicalOp
                     noMatch = true;
                     break;
                 }
-                ce.setValue(new NonSpillableDataBag(replicate.get(key)));
+                ce.setValue(new NonSpillableDataBag(replicate.get(key).getList()));
             }
 
             // If this is not LeftOuter Join and there was no match we
@@ -329,28 +327,27 @@ public class POFRJoin extends PhysicalOp
         }
     }
 
-    protected static class TupleToMapKey extends HashMap<Object, ArrayList<Tuple>> {
+    protected static class TupleToMapKey {
+        private HashMap<Tuple, TuplesToSchemaTupleList> tuples;
         private SchemaTupleFactory tf;
 
         public TupleToMapKey(int ct, SchemaTupleFactory tf) {
-            super(ct);
+            tuples = new HashMap<Tuple, TuplesToSchemaTupleList>(ct);
             this.tf = tf;
         }
 
-        @Override
-        public TuplesToSchemaTupleList put(Object key, ArrayList<Tuple> val) {
-            if (tf != null && key instanceof Tuple) {
-                key = TuplesToSchemaTupleList.convert((Tuple)key, tf);
+        public TuplesToSchemaTupleList put(Tuple key, TuplesToSchemaTupleList val) {
+            if (tf != null) {
+                key = TuplesToSchemaTupleList.convert(key, tf);
             }
-            return (TuplesToSchemaTupleList) super.put(key, val);
+            return tuples.put(key, val);
         }
 
-        @Override
-        public TuplesToSchemaTupleList get(Object key) {
-            if (tf != null && key instanceof Tuple) {
-                key = TuplesToSchemaTupleList.convert((Tuple)key, tf);
+        public TuplesToSchemaTupleList get(Tuple key) {
+            if (tf != null) {
+                key = TuplesToSchemaTupleList.convert(key, tf);
             }
-            return (TuplesToSchemaTupleList) super.get(key);
+            return tuples.get(key);
         }
     }
 
@@ -385,7 +382,7 @@ public class POFRJoin extends PhysicalOp
             SchemaTupleFactory keySchemaTupleFactory = keySchemaTupleFactories[i];
 
             if (i == fragment) {
-                replicates.set(i, null);
+                replicates[i] = null;
                 continue;
             }
 
@@ -404,34 +401,25 @@ public class POFRJoin extends PhysicalOp
             POLocalRearrange lr = LRs[i];
             lr.setInputs(Arrays.asList((PhysicalOperator) ld));
 
-            Map<Object, ArrayList<Tuple>> replicate;
-            if (keySchemaTupleFactory == null) {
-                replicate = new HashMap<Object, ArrayList<Tuple>>(1000);
-            } else {
-                replicate = new TupleToMapKey(1000, keySchemaTupleFactory);
-            }
+            TupleToMapKey replicate = new TupleToMapKey(1000, keySchemaTupleFactory);
 
             log.debug("Completed setup. Trying to build replication hash table");
             for (Result res = lr.getNextTuple(); res.returnStatus != POStatus.STATUS_EOP; res = lr.getNextTuple()) {
                 if (getReporter() != null)
                     getReporter().progress();
                 Tuple tuple = (Tuple) res.result;
-                Object key = tuple.get(1);
-                if (isKeyNull(key)) continue;
+                if (isKeyNull(tuple.get(1))) continue;
+                Tuple key = mTupleFactory.newTuple(1);
+                key.set(0, tuple.get(1));
                 Tuple value = getValueTuple(lr, tuple);
 
-                ArrayList<Tuple> values = replicate.get(key);
-                if (values == null) {
-                    if (inputSchemaTupleFactory == null) {
-                        values = new ArrayList<Tuple>(1);
-                    } else {
-                        values = new TuplesToSchemaTupleList(1, inputSchemaTupleFactory);
-                    }
-                    replicate.put(key, values);
+                if (replicate.get(key) == null) {
+                    replicate.put(key, new TuplesToSchemaTupleList(1, inputSchemaTupleFactory));
                 }
-                values.add(value);
+
+                replicate.get(key).add(value);
             }
-            replicates.set(i, replicate);
+            replicates[i] = replicate;
         }
         long time2 = System.currentTimeMillis();
         log.debug("Hash Table built. Time taken: " + (time2 - time1));

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoinSpark.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoinSpark.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoinSpark.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoinSpark.java Fri Feb 24 03:34:37 2017
@@ -51,7 +51,7 @@ public class POFRJoinSpark extends POFRJ
             addSchemaToFactories(keySchemas[i], keySchemaTupleFactories, i);
         }
 
-        replicates.set(fragment, null);
+        replicates[fragment] = null;
         int i = -1;
         long start = System.currentTimeMillis();
         for (int k = 0; k < inputSchemas.length; ++k) {
@@ -61,7 +61,7 @@ public class POFRJoinSpark extends POFRJ
             SchemaTupleFactory keySchemaTupleFactory = keySchemaTupleFactories[i];
 
             if (i == fragment) {
-                replicates.set(fragment, null);
+                replicates[i] = null;
                 continue;
             }
 
@@ -91,7 +91,7 @@ public class POFRJoinSpark extends POFRJ
                 replicate.get(key).add(value);
 
             }
-            replicates.set(i, replicate);
+            replicates[i] = replicate;
         }
         long end = System.currentTimeMillis();
         log.debug("Hash Table built. Time taken: " + (end - start));

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java Fri Feb 24 03:34:37 2017
@@ -22,7 +22,6 @@ import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
 
 import org.apache.pig.PigException;
 import org.apache.pig.backend.executionengine.ExecException;
@@ -56,7 +55,6 @@ import org.apache.pig.pen.util.LineageTr
 @SuppressWarnings("unchecked")
 public class POForEach extends PhysicalOperator {
     private static final long serialVersionUID = 1L;
-    private static final Result UNLIMITED_NULL_RESULT = new Result(POStatus.STATUS_OK, new UnlimitedNullTuple());
 
     protected List<PhysicalPlan> inputPlans;
 
@@ -266,7 +264,7 @@ public class POForEach extends PhysicalO
                 if (inp.returnStatus == POStatus.STATUS_EOP) {
                     if (parentPlan!=null && parentPlan.endOfAllInput && !endOfAllInputProcessed && endOfAllInputProcessing) {
                         // continue pull one more output
-                        inp = UNLIMITED_NULL_RESULT;
+                        inp = new Result(POStatus.STATUS_OK, new UnlimitedNullTuple());
                     } else {
                         return inp;
                     }
@@ -443,8 +441,6 @@ public class POForEach extends PhysicalO
 
                 if(inputData.result instanceof DataBag && isToBeFlattenedArray[i]) {
                     its[i] = ((DataBag)bags[i]).iterator();
-                } else if (inputData.result instanceof Map && isToBeFlattenedArray[i]) {
-                    its[i] = ((Map)bags[i]).entrySet().iterator();
                 } else {
                     its[i] = null;
                 }
@@ -470,7 +466,7 @@ public class POForEach extends PhysicalO
                 //we instantiate the template array and start populating it with data
                 data = new Object[noItems];
                 for(int i = 0; i < noItems; ++i) {
-                    if(isToBeFlattenedArray[i] && (bags[i] instanceof DataBag || bags[i] instanceof Map)) {
+                    if(isToBeFlattenedArray[i] && bags[i] instanceof DataBag) {
                         if(its[i].hasNext()) {
                             data[i] = its[i].next();
                         } else {
@@ -544,15 +540,6 @@ public class POForEach extends PhysicalO
                     out.append(t.get(j));
                 }
                 }
-            } else if (isToBeFlattenedArray[i] && in instanceof Map.Entry) {
-                Map.Entry entry = (Map.Entry)in;
-                if (knownSize) {
-                    out.set(idx++, entry.getKey());
-                    out.set(idx++, entry.getValue());
-                } else {
-                    out.append(entry.getKey());
-                    out.append(entry.getValue());
-                }
             } else {
                 if (knownSize) {
                     out.set(idx++, in);
@@ -751,12 +738,9 @@ public class POForEach extends PhysicalO
             opsToBeReset.add(sort);
         }
 
-        @Override
-        public void visitCross(POCross c) throws VisitorException {
-            // FIXME: add only if limit is present
-            opsToBeReset.add(c);
-        }
-
+        /* (non-Javadoc)
+         * @see org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor#visitProject(org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject)
+         */
         @Override
         public void visitProject(POProject proj) throws VisitorException {
             if(proj instanceof PORelationToExprProject) {

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java Fri Feb 24 03:34:37 2017
@@ -56,11 +56,11 @@ import org.apache.pig.impl.plan.VisitorE
 import org.apache.pig.impl.util.MultiMap;
 import org.apache.pig.newplan.logical.relational.LOJoin;
 
-/** This operator implements merge join algorithm to do map side joins.
+/** This operator implements merge join algorithm to do map side joins. 
  *  Currently, only two-way joins are supported. One input of join is identified as left
  *  and other is identified as right. Left input tuples are the input records in map.
  *  Right tuples are read from HDFS by opening right stream.
- *
+ *  
  *    This join doesn't support outer join.
  *    Data is assumed to be sorted in ascending order. It will fail if data is sorted in descending order.
  */
@@ -99,7 +99,7 @@ public class POMergeJoin extends Physica
     private FuncSpec rightLoaderFuncSpec;
 
     private String rightInputFileName;
-
+    
     private String indexFile;
 
     // Buffer to hold accumulated left tuples.
@@ -249,11 +249,12 @@ public class POMergeJoin extends Physica
      * from Tuple to SchemaTuple. This is necessary because we are not getting SchemaTuples
      * from the source, though in the future that is what we would like to do.
      */
-    public static class TuplesToSchemaTupleList extends ArrayList<Tuple> {
+    public static class TuplesToSchemaTupleList {
+        private List<Tuple> tuples;
         private SchemaTupleFactory tf;
 
         public TuplesToSchemaTupleList(int ct, TupleMaker<?> tf) {
-            super(ct);
+            tuples = new ArrayList<Tuple>(ct);
             if (tf instanceof SchemaTupleFactory) {
                 this.tf = (SchemaTupleFactory)tf;
             }
@@ -272,24 +273,24 @@ public class POMergeJoin extends Physica
             }
         }
 
-        @Override
         public boolean add(Tuple t) {
             if (tf != null) {
                 t = convert(t, tf);
             }
-            return super.add(t);
+            return tuples.add(t);
         }
 
-        @Override
         public Tuple get(int i) {
-            return super.get(i);
+            return tuples.get(i);
         }
 
-        @Override
         public int size() {
-            return super.size();
+            return tuples.size();
         }
 
+        public List<Tuple> getList() {
+            return tuples;
+        }
     }
 
     @SuppressWarnings("unchecked")
@@ -356,7 +357,7 @@ public class POMergeJoin extends Physica
                 }
                 else{
                     Object rightKey = extractKeysFromTuple(rightInp, 1);
-                    if(null == rightKey) // If we see tuple having null keys in stream, we drop them
+                    if(null == rightKey) // If we see tuple having null keys in stream, we drop them 
                         continue;       // and fetch next tuple.
 
                     int cmpval = ((Comparable)rightKey).compareTo(curJoinKey);
@@ -398,7 +399,7 @@ public class POMergeJoin extends Physica
                             "Last two tuples encountered were: \n"+
                         curJoiningRightTup+ "\n" + (Tuple)rightInp.result ;
                         throw new ExecException(errMsg,errCode);
-                    }
+                    }    
                 }
             }
         }
@@ -429,17 +430,17 @@ public class POMergeJoin extends Physica
                 prevLeftKey+ "\n" + curLeftKey ;
                 throw new ExecException(errMsg,errCode);
             }
-
+ 
         case POStatus.STATUS_EOP:
             if(this.parentPlan.endOfAllInput || isEndOfInput()){
-                // We hit the end on left input.
+                // We hit the end on left input. 
                 // Tuples in bag may still possibly join with right side.
                 curJoinKey = prevLeftKey;
                 curLeftKey = null;
                 if (isEndOfInput()) {
                     leftInputConsumedInSpark = true;
                 }
-                break;
+                break;                
             }
             else    // Fetch next left input.
                 return curLeftInp;
@@ -464,7 +465,7 @@ public class POMergeJoin extends Physica
         // Accumulated tuples with same key on left side.
         // But since we are reading ahead we still haven't checked the read ahead right tuple.
         // Accumulated left tuples may potentially join with that. So, lets check that first.
-
+        
         if((null != prevRightKey) && prevRightKey.equals(prevLeftKey)){
 
             curJoiningRightTup = (Tuple)prevRightInp.result;
@@ -486,17 +487,17 @@ public class POMergeJoin extends Physica
                 slidingToNextRecord = false;
             } else
                 rightInp = getNextRightInp(prevLeftKey);
-
+                
             if(rightInp.returnStatus != POStatus.STATUS_OK)
                 return rightInp;
 
             Object extractedRightKey = extractKeysFromTuple(rightInp, 1);
-
-            if(null == extractedRightKey) // If we see tuple having null keys in stream, we drop them
+            
+            if(null == extractedRightKey) // If we see tuple having null keys in stream, we drop them 
                 continue;       // and fetch next tuple.
-
+            
             Comparable rightKey = (Comparable)extractedRightKey;
-
+            
             if( prevRightKey != null && rightKey.compareTo(prevRightKey) < 0){
                 // Sanity check.
                 int errCode = 1102;
@@ -527,7 +528,7 @@ public class POMergeJoin extends Physica
             else{    // We got ahead on right side. Store currently read right tuple.
                 prevRightKey = rightKey;
                 prevRightInp = rightInp;
-                // Since we didn't find any matching right tuple we throw away the buffered left tuples and add the one read in this function call.
+                // Since we didn't find any matching right tuple we throw away the buffered left tuples and add the one read in this function call. 
                 leftTuples = newLeftTupleArray();
                 leftTuples.add((Tuple)curLeftInp.result);
                 prevLeftInp = curLeftInp;
@@ -554,7 +555,7 @@ public class POMergeJoin extends Physica
             DefaultIndexableLoader loader = (DefaultIndexableLoader)rightLoader;
             loader.setIndexFile(indexFile);
         }
-
+        
         // Pass signature of the loader to rightLoader
         // make a copy of the conf to use in calls to rightLoader.
         rightLoader.setUDFContextSignature(signature);
@@ -607,11 +608,11 @@ public class POMergeJoin extends Physica
                         // run the tuple through the pipeline
                         rightPipelineRoot.attachInput(t);
                         return this.getNextRightInp();
-
+                        
                     }
                     default: // We don't deal with ERR/NULL. just pass them down
                         throwProcessingException(false, null);
-
+                        
                 }
             }
         } catch (IOException e) {
@@ -642,8 +643,8 @@ public class POMergeJoin extends Physica
             int errCode = 2167;
             String errMsg = "LocalRearrange used to extract keys from tuple isn't configured correctly";
             throw new ExecException(errMsg,errCode,PigException.BUG);
-        }
-
+        } 
+          
         return ((Tuple) lrOut.result).get(1);
     }
 
@@ -659,7 +660,7 @@ public class POMergeJoin extends Physica
             noInnerPlanOnRightSide = false;
             this.rightPipelineLeaf = rightPipeline.getLeaves().get(0);
             this.rightPipelineRoot = rightPipeline.getRoots().get(0);
-            this.rightPipelineRoot.setInputs(null);
+            this.rightPipelineRoot.setInputs(null);            
         }
         else
             noInnerPlanOnRightSide = true;
@@ -710,18 +711,18 @@ public class POMergeJoin extends Physica
     public boolean supportsMultipleOutputs() {
         return false;
     }
-
+    
     /**
      * @param rightInputFileName the rightInputFileName to set
      */
     public void setRightInputFileName(String rightInputFileName) {
         this.rightInputFileName = rightInputFileName;
     }
-
+    
     public String getSignature() {
         return signature;
     }
-
+    
     public void setSignature(String signature) {
         this.signature = signature;
     }
@@ -733,12 +734,12 @@ public class POMergeJoin extends Physica
     public String getIndexFile() {
         return indexFile;
     }
-
+    
     @Override
     public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
         return null;
     }
-
+    
     public LOJoin.JOINTYPE getJoinType() {
         return joinType;
     }

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPoissonSample.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPoissonSample.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPoissonSample.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPoissonSample.java Fri Feb 24 03:34:37 2017
@@ -44,9 +44,6 @@ public class POPoissonSample extends Phy
 
     private transient boolean initialized;
 
-    // num of rows skipped so far
-    private transient int numSkipped;
-
     // num of rows sampled so far
     private transient int numRowsSampled;
 
@@ -92,7 +89,6 @@ public class POPoissonSample extends Phy
     @Override
     public Result getNextTuple() throws ExecException {
         if (!initialized) {
-            numSkipped = 0;
             numRowsSampled = 0;
             avgTupleMemSz = 0;
             rowNum = 0;
@@ -138,7 +134,7 @@ public class POPoissonSample extends Phy
         }
 
         // skip tuples
-        while (numSkipped < skipInterval) {
+        for (long numSkipped  = 0; numSkipped < skipInterval; numSkipped++) {
             res = processInput();
             if (res.returnStatus == POStatus.STATUS_NULL) {
                 continue;
@@ -152,7 +148,6 @@ public class POPoissonSample extends Phy
                 return res;
             }
             rowNum++;
-            numSkipped++;
         }
 
         // skipped enough, get new sample
@@ -178,8 +173,6 @@ public class POPoissonSample extends Phy
 
             rowNum++;
             newSample = res;
-            // reset skipped
-            numSkipped = 0;
             return currentSample;
         }
     }

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java Fri Feb 24 03:34:37 2017
@@ -125,7 +125,7 @@ public class POReservoirSample extends P
                 }
 
                 // collect samples until input is exhausted
-                int rand = randGen.nextInt(rowProcessed + 1);
+                int rand = randGen.nextInt(rowProcessed);
                 if (rand < numSamples) {
                     samples[rand] = res;
                 }
@@ -133,13 +133,8 @@ public class POReservoirSample extends P
             }
         }
 
-        if (res.returnStatus == POStatus.STATUS_EOP) {
-            if (this.parentPlan.endOfAllInput) {
-                sampleCollectionDone = true;
-            } else {
-                // In case of Split can get EOP in between.
-                return res;
-            }
+        if (this.parentPlan.endOfAllInput && res.returnStatus == POStatus.STATUS_EOP) {
+            sampleCollectionDone = true;
         }
 
         return getSample();

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java Fri Feb 24 03:34:37 2017
@@ -51,13 +51,13 @@ public class Packager implements Illustr
     protected DataBag[] bags;
 
     public static enum PackageType {
-        GROUP, JOIN, BLOOMJOIN
+        GROUP, JOIN
     };
 
     protected transient Illustrator illustrator = null;
 
     // The key being worked on
-    protected Object key;
+    Object key;
 
     // marker to indicate if key is a tuple
     protected boolean isKeyTuple = false;
@@ -65,7 +65,7 @@ public class Packager implements Illustr
     protected boolean isKeyCompound = false;
 
     // key's type
-    protected byte keyType;
+    byte keyType;
 
     // The number of inputs to this
     // co-group. 0 indicates a distinct, which means there will only be a

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/StoreFuncDecorator.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/StoreFuncDecorator.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/StoreFuncDecorator.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/StoreFuncDecorator.java Fri Feb 24 03:34:37 2017
@@ -60,7 +60,7 @@ public class StoreFuncDecorator {
 
     private boolean allowErrors() {
         return UDFContext.getUDFContext().getJobConf()
-                .getBoolean(PigConfiguration.PIG_ERROR_HANDLING_ENABLED, false);
+                .getBoolean(PigConfiguration.PIG_ALLOW_STORE_ERRORS, false);
     }
 
     /**

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java Fri Feb 24 03:34:37 2017
@@ -162,13 +162,8 @@ public class LoadConverter implements RD
         private SparkEngineConf sparkEngineConf;
         private boolean initialized;
 
-        //LoadConverter#ToTupleFunction is executed more than once in multiquery case causing
-        //invalid number of input records, 'skip' flag below indicates first load is finished.
-        private boolean skip;
-
         public ToTupleFunction(SparkEngineConf sparkEngineConf){
                this.sparkEngineConf = sparkEngineConf;
-
         }
 
         @Override
@@ -177,14 +172,9 @@ public class LoadConverter implements RD
                 long partitionId = TaskContext.get().partitionId();
                 PigMapReduce.sJobConfInternal.get().set(PigConstants.TASK_INDEX, Long.toString(partitionId));
 
-                //We're in POSplit and already counted all input records,
-                //in a multiquery case skip will be set to true after the first load is finished:
-                if (sparkCounters != null && SparkPigStatusReporter.getInstance().getCounters().getCounter(counterGroupName, counterName).getValue() > 0) {
-                    skip=true;
-                }
                 initialized = true;
             }
-            if (sparkCounters != null && disableCounter == false && skip == false) {
+            if (sparkCounters != null && disableCounter == false) {
                 sparkCounters.increment(counterGroupName, counterName, 1L);
             }
             return v1._2();

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java Fri Feb 24 03:34:37 2017
@@ -19,14 +19,13 @@
 package org.apache.pig.backend.hadoop.executionengine.tez;
 
 import java.io.IOException;
+import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Set;
 import java.util.TreeMap;
 
@@ -44,7 +43,6 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager;
-import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
 import org.apache.hadoop.mapreduce.v2.util.MRApps;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
@@ -58,7 +56,6 @@ import org.apache.pig.backend.executione
 import org.apache.pig.backend.hadoop.HDataType;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.backend.hadoop.executionengine.JobCreationException;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.DistinctCombiner;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.InputSizeReducerEstimator;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigSecondaryKeyGroupComparator;
@@ -90,6 +87,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
+import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezEdgeDescriptor;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOpPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan;
@@ -110,6 +108,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.data.DataType;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.PigImplConstants;
+import org.apache.pig.impl.builtin.DefaultIndexableLoader;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.io.NullablePartitionWritable;
 import org.apache.pig.impl.io.NullableTuple;
@@ -175,7 +174,6 @@ public class TezDagBuilder extends TezOp
     private PigContext pc;
     private Configuration globalConf;
     private Configuration pigContextConf;
-    private Configuration shuffleVertexManagerBaseConf;
     private FileSystem fs;
     private long intermediateTaskInputSize;
     private Set<String> inputSplitInDiskVertices;
@@ -193,8 +191,6 @@ public class TezDagBuilder extends TezOp
     private String mapTaskLaunchCmdOpts;
     private String reduceTaskLaunchCmdOpts;
 
-    private boolean disableDAGRecovery = false;
-
     public TezDagBuilder(PigContext pc, TezOperPlan plan, DAG dag,
             Map<String, LocalResource> localResources) {
         super(plan, new DependencyOrderWalker<TezOperator, TezOperPlan>(plan));
@@ -214,10 +210,6 @@ public class TezDagBuilder extends TezOp
         }
     }
 
-    public boolean shouldDisableDAGRecovery() {
-        return disableDAGRecovery;
-    }
-
     private void initialize(PigContext pc) throws IOException {
 
         this.globalConf = ConfigurationUtil.toConfiguration(pc.getProperties(), true);
@@ -225,16 +217,6 @@ public class TezDagBuilder extends TezOp
         this.pigContextConf = ConfigurationUtil.toConfiguration(pc.getProperties(), false);
         MRToTezHelper.processMRSettings(pigContextConf, globalConf);
 
-        shuffleVertexManagerBaseConf = new Configuration(false);
-        // Only copy tez.shuffle-vertex-manager config to keep payload size small
-        Iterator<Entry<String, String>> iter = pigContextConf.iterator();
-        while (iter.hasNext()) {
-            Entry<String, String> entry = iter.next();
-            if (entry.getKey().startsWith("tez.shuffle-vertex-manager")) {
-                shuffleVertexManagerBaseConf.set(entry.getKey(), entry.getValue());
-            }
-        }
-
         // Add credentials from binary token file and get tokens for namenodes
         // specified in mapreduce.job.hdfs-servers
         SecurityHelper.populateTokenCache(globalConf, dag.getCredentials());
@@ -283,7 +265,7 @@ public class TezDagBuilder extends TezOp
         if (globalConf.get(TezConfiguration.TEZ_TASK_LAUNCH_CMD_OPTS) == null) {
             // If tez setting is not defined
             MRHelpers.updateEnvBasedOnMRTaskEnv(globalConf, mapTaskEnv, true);
-            MRHelpers.updateEnvBasedOnMRTaskEnv(globalConf, reduceTaskEnv, false);
+            MRHelpers.updateEnvBasedOnMRTaskEnv(globalConf, reduceTaskEnv, true);
         }
 
         if (globalConf.get(TezConfiguration.TEZ_TASK_LAUNCH_CMD_OPTS) != null) {
@@ -297,7 +279,7 @@ public class TezDagBuilder extends TezOp
 
         try {
             fs = FileSystem.get(globalConf);
-            intermediateTaskInputSize = fs.getDefaultBlockSize(FileLocalizer.getTemporaryResourcePath(pc));
+            intermediateTaskInputSize = HadoopShims.getDefaultBlockSize(fs, FileLocalizer.getTemporaryResourcePath(pc));
         } catch (Exception e) {
             log.warn("Unable to get the block size for temporary directory, defaulting to 128MB", e);
             intermediateTaskInputSize = 134217728L;
@@ -415,11 +397,7 @@ public class TezDagBuilder extends TezOp
                 tezOp.getVertexGroupInfo().setVertexGroup(vertexGroup);
                 POStore store = tezOp.getVertexGroupInfo().getStore();
                 if (store != null) {
-                    String outputKey = store.getOperatorKey().toString();
-                    if (store instanceof POStoreTez) {
-                        outputKey = ((POStoreTez) store).getOutputKey();
-                    }
-                    vertexGroup.addDataSink(outputKey,
+                    vertexGroup.addDataSink(store.getOperatorKey().toString(),
                             DataSinkDescriptor.create(tezOp.getVertexGroupInfo().getStoreOutputDescriptor(),
                             OutputCommitterDescriptor.create(MROutputCommitter.class.getName()), dag.getCredentials()));
                 }
@@ -463,14 +441,7 @@ public class TezDagBuilder extends TezOp
 
         Configuration conf = new Configuration(pigContextConf);
 
-        if (edge.needsDistinctCombiner()) {
-            conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_COMBINER_CLASS,
-                    MRCombiner.class.getName());
-            conf.set(MRJobConfig.COMBINE_CLASS_ATTR,
-                    DistinctCombiner.Combine.class.getName());
-            log.info("Setting distinct combiner class between "
-                    + from.getOperatorKey() + " and " + to.getOperatorKey());
-        } else if (!combinePlan.isEmpty()) {
+        if (!combinePlan.isEmpty()) {
             udfContextSeparator.serializeUDFContextForEdge(conf, from, to, UDFType.USERFUNC);
             addCombiner(combinePlan, to, conf, isMergedInput);
         }
@@ -479,7 +450,7 @@ public class TezDagBuilder extends TezOp
                 POLocalRearrangeTez.class);
 
         for (POLocalRearrangeTez lr : lrs) {
-            if (lr.containsOutputKey(to.getOperatorKey().toString())) {
+            if (lr.getOutputKey().equals(to.getOperatorKey().toString())) {
                 byte keyType = lr.getKeyType();
                 setIntermediateOutputKeyValue(keyType, conf, to, lr.isConnectedToPackage(), isMergedInput);
                 // In case of secondary key sort, main key type is the actual key type
@@ -508,8 +479,7 @@ public class TezDagBuilder extends TezOp
 
         conf.setBoolean(MRConfiguration.MAPPER_NEW_API, true);
         conf.setBoolean(MRConfiguration.REDUCER_NEW_API, true);
-        conf.setBoolean(PigImplConstants.PIG_EXECTYPE_MODE_LOCAL, pc.getExecType().isLocal());
-        conf.set(PigImplConstants.PIG_LOG4J_PROPERTIES, ObjectSerializer.serialize(pc.getLog4jProperties()));
+        conf.set("pig.pigContext", serializedPigContext);
         conf.set("udf.import.list", serializedUDFImportList);
 
         if(to.isGlobalSort() || to.isLimitAfterSort()){
@@ -540,36 +510,26 @@ public class TezDagBuilder extends TezOp
 
         UserPayload payLoad = TezUtils.createUserPayloadFromConf(conf);
         out.setUserPayload(payLoad);
-        in.setUserPayload(payLoad);
 
-        // Remove combiner and reset payload
         if (!combinePlan.isEmpty()) {
             boolean noCombineInReducer = false;
-            boolean noCombineInMapper = edge.getCombinerInMap() == null ? false : !edge.getCombinerInMap();
             String reducerNoCombiner = globalConf.get(PigConfiguration.PIG_EXEC_NO_COMBINER_REDUCER);
-            if (edge.getCombinerInReducer() != null) {
-                noCombineInReducer = !edge.getCombinerInReducer();
-            } else if (reducerNoCombiner == null || reducerNoCombiner.equals("auto")) {
+            if (reducerNoCombiner == null || reducerNoCombiner.equals("auto")) {
                 noCombineInReducer = TezCompilerUtil.bagDataTypeInCombinePlan(combinePlan);
             } else {
                 noCombineInReducer = Boolean.parseBoolean(reducerNoCombiner);
             }
-            if (noCombineInReducer || noCombineInMapper) {
+            if (noCombineInReducer) {
                 log.info("Turning off combiner in reducer vertex " + to.getOperatorKey() + " for edge from " + from.getOperatorKey());
                 conf.unset(TezRuntimeConfiguration.TEZ_RUNTIME_COMBINER_CLASS);
                 conf.unset(MRJobConfig.COMBINE_CLASS_ATTR);
                 conf.unset("pig.combinePlan");
                 conf.unset("pig.combine.package");
                 conf.unset("pig.map.keytype");
-                UserPayload payLoadWithoutCombiner = TezUtils.createUserPayloadFromConf(conf);
-                if (noCombineInMapper) {
-                    out.setUserPayload(payLoadWithoutCombiner);
-                }
-                if (noCombineInReducer) {
-                    in.setUserPayload(payLoadWithoutCombiner);
-                }
+                payLoad = TezUtils.createUserPayloadFromConf(conf);
             }
         }
+        in.setUserPayload(payLoad);
 
         if (edge.dataMovementType!=DataMovementType.BROADCAST && to.getEstimatedParallelism()!=-1 && to.getVertexParallelism()==-1 && (to.isGlobalSort()||to.isSkewedJoin())) {
             // Use custom edge
@@ -633,8 +593,6 @@ public class TezDagBuilder extends TezOp
         setOutputFormat(job);
         payloadConf.set("udf.import.list", serializedUDFImportList);
         payloadConf.set("exectype", "TEZ");
-        payloadConf.setBoolean(PigImplConstants.PIG_EXECTYPE_MODE_LOCAL, pc.getExecType().isLocal());
-        payloadConf.set(PigImplConstants.PIG_LOG4J_PROPERTIES, ObjectSerializer.serialize(pc.getLog4jProperties()));
 
         // Process stores
         LinkedList<POStore> stores = processStores(tezOp, payloadConf, job);
@@ -653,7 +611,11 @@ public class TezDagBuilder extends TezOp
             payloadConf.set(PigInputFormat.PIG_INPUT_SIGNATURES, ObjectSerializer.serialize(tezOp.getLoaderInfo().getInpSignatureLists()));
             payloadConf.set(PigInputFormat.PIG_INPUT_LIMITS, ObjectSerializer.serialize(tezOp.getLoaderInfo().getInpLimits()));
             inputPayLoad = new Configuration(payloadConf);
+            if (tezOp.getLoaderInfo().getLoads().get(0).getLoadFunc() instanceof DefaultIndexableLoader) {
+                inputPayLoad.set("pig.pigContext", serializedPigContext);
+            }
         }
+        payloadConf.set("pig.pigContext", serializedPigContext);
 
         if (tezOp.getSampleOperator() != null) {
             payloadConf.set(PigProcessor.SAMPLE_VERTEX, tezOp.getSampleOperator().getOperatorKey().toString());
@@ -727,7 +689,7 @@ public class TezDagBuilder extends TezOp
                             PlanHelper.getPhysicalOperators(pred.plan, POLocalRearrangeTez.class);
                     for (POLocalRearrangeTez lr : lrs) {
                         if (lr.isConnectedToPackage()
-                                && lr.containsOutputKey(tezOp.getOperatorKey().toString())) {
+                                && lr.getOutputKey().equals(tezOp.getOperatorKey().toString())) {
                             localRearrangeMap.put((int) lr.getIndex(), inputKey);
                             if (isVertexGroup) {
                                 isMergedInput = true;
@@ -810,25 +772,9 @@ public class TezDagBuilder extends TezOp
 
         String vmPluginName = null;
         Configuration vmPluginConf = null;
-        boolean containScatterGather = false;
-        boolean containCustomPartitioner = false;
-        for (TezEdgeDescriptor edge : tezOp.inEdges.values()) {
-            if (edge.dataMovementType == DataMovementType.SCATTER_GATHER) {
-                containScatterGather = true;
-            }
-            if (edge.partitionerClass != null) {
-                containCustomPartitioner = true;
-            }
-        }
-
-        if(containScatterGather) {
-            vmPluginName = ShuffleVertexManager.class.getName();
-            vmPluginConf = new Configuration(shuffleVertexManagerBaseConf);
-        }
 
         // Set the right VertexManagerPlugin
         if (tezOp.getEstimatedParallelism() != -1) {
-            boolean autoParallelism = false;
             if (tezOp.isGlobalSort()||tezOp.isSkewedJoin()) {
                 if (tezOp.getVertexParallelism()==-1 && (
                         tezOp.isGlobalSort() &&getPlan().getPredecessors(tezOp).size()==1||
@@ -837,12 +783,33 @@ public class TezDagBuilder extends TezOp
                     // to decrease/increase parallelism of sorting vertex dynamically
                     // based on the numQuantiles calculated by sample aggregation vertex
                     vmPluginName = PartitionerDefinedVertexManager.class.getName();
-                    autoParallelism = true;
                     log.info("Set VertexManagerPlugin to PartitionerDefinedParallelismVertexManager for vertex " + tezOp.getOperatorKey().toString());
                 }
             } else {
+                boolean containScatterGather = false;
+                boolean containCustomPartitioner = false;
+                for (TezEdgeDescriptor edge : tezOp.inEdges.values()) {
+                    if (edge.dataMovementType == DataMovementType.SCATTER_GATHER) {
+                        containScatterGather = true;
+                    }
+                    if (edge.partitionerClass!=null) {
+                        containCustomPartitioner = true;
+                    }
+                }
                 if (containScatterGather && !containCustomPartitioner) {
-
+                    vmPluginConf = (vmPluginConf == null) ? new Configuration(pigContextConf) : vmPluginConf;
+                    // Use auto-parallelism feature of ShuffleVertexManager to dynamically
+                    // reduce the parallelism of the vertex
+                    if (payloadConf.getBoolean(PigConfiguration.PIG_TEZ_GRACE_PARALLELISM, true)
+                            && !TezOperPlan.getGrandParentsForGraceParallelism(getPlan(), tezOp).isEmpty()) {
+                        vmPluginName = PigGraceShuffleVertexManager.class.getName();
+                        tezOp.setUseGraceParallelism(true);
+                        vmPluginConf.set("pig.tez.plan", getSerializedTezPlan());
+                        vmPluginConf.set("pig.pigContext", serializedPigContext);
+                    } else {
+                        vmPluginName = ShuffleVertexManager.class.getName();
+                    }
+                    vmPluginConf.setBoolean(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL, true);
                     // For Intermediate reduce, set the bytes per reducer to be block size.
                     long bytesPerReducer = intermediateTaskInputSize;
                     // If there are store statements, use BYTES_PER_REDUCER_PARAM configured by user.
@@ -851,8 +818,8 @@ public class TezDagBuilder extends TezOp
                     // In Tez, numReducers=(map output size/bytesPerReducer) we need lower values to avoid skews in reduce
                     // as map input sizes are mostly always high compared to map output.
                     if (stores.size() > 0) {
-                        if (pigContextConf.get(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM) != null) {
-                            bytesPerReducer = pigContextConf.getLong(
+                        if (vmPluginConf.get(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM) != null) {
+                            bytesPerReducer = vmPluginConf.getLong(
                                             InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
                                             InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER);
                         } else if (tezOp.isGroupBy()) {
@@ -861,28 +828,10 @@ public class TezDagBuilder extends TezOp
                             bytesPerReducer = SHUFFLE_BYTES_PER_REDUCER_DEFAULT;
                         }
                     }
-
-                    // Use auto-parallelism feature of ShuffleVertexManager to dynamically
-                    // reduce the parallelism of the vertex. Use PigGraceShuffleVertexManager
-                    // instead of ShuffleVertexManager if pig.tez.grace.parallelism is turned on
-                    if (payloadConf.getBoolean(PigConfiguration.PIG_TEZ_GRACE_PARALLELISM, true)
-                            && !TezOperPlan.getGrandParentsForGraceParallelism(getPlan(), tezOp).isEmpty()
-                            && tezOp.getCrossKeys() == null) {
-                        vmPluginName = PigGraceShuffleVertexManager.class.getName();
-                        tezOp.setUseGraceParallelism(true);
-                        vmPluginConf.set("pig.tez.plan", getSerializedTezPlan());
-                        vmPluginConf.set(PigImplConstants.PIG_CONTEXT, serializedPigContext);
-                        vmPluginConf.setLong(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, bytesPerReducer);
-                    }
-                    vmPluginConf.setBoolean(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL, true);
                     vmPluginConf.setLong(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE, bytesPerReducer);
-                    autoParallelism = true;
                     log.info("Set auto parallelism for vertex " + tezOp.getOperatorKey().toString());
                 }
             }
-            if (globalConf.getBoolean(PigConfiguration.PIG_TEZ_AUTO_PARALLELISM_DISABLE_DAG_RECOVERY, false) && autoParallelism) {
-                disableDAGRecovery = true;
-            }
         }
         if (tezOp.isLimit() && (vmPluginName == null || vmPluginName.equals(PigGraceShuffleVertexManager.class.getName())||
                 vmPluginName.equals(ShuffleVertexManager.class.getName()))) {
@@ -1460,12 +1409,22 @@ public class TezDagBuilder extends TezOp
 
     private void setOutputFormat(org.apache.hadoop.mapreduce.Job job) {
         // the OutputFormat we report to Hadoop is always PigOutputFormat which
-        // can be wrapped with LazyOutputFormat provided if PigConfiguration.PIG_OUTPUT_LAZY is set
+        // can be wrapped with LazyOutputFormat provided if it is supported by
+        // the Hadoop version and PigConfiguration.PIG_OUTPUT_LAZY is set
         if ("true".equalsIgnoreCase(job.getConfiguration().get(PigConfiguration.PIG_OUTPUT_LAZY))) {
-            LazyOutputFormat.setOutputFormatClass(job,PigOutputFormatTez.class);
+            try {
+                Class<?> clazz = PigContext
+                        .resolveClassName("org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat");
+                Method method = clazz.getMethod("setOutputFormatClass",
+                        org.apache.hadoop.mapreduce.Job.class, Class.class);
+                method.invoke(null, job, PigOutputFormatTez.class);
+            } catch (Exception e) {
+                job.setOutputFormatClass(PigOutputFormatTez.class);
+                log.warn(PigConfiguration.PIG_OUTPUT_LAZY
+                        + " is set but LazyOutputFormat couldn't be loaded. Default PigOutputFormat will be used");
+            }
         } else {
             job.setOutputFormatClass(PigOutputFormatTez.class);
         }
     }
-
 }

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java Fri Feb 24 03:34:37 2017
@@ -30,11 +30,6 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.pig.PigConfiguration;
-import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOpPlanVisitor;
-import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan;
-import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperator;
-import org.apache.pig.impl.plan.DependencyOrderWalker;
-import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.util.UDFContext;
 import org.apache.pig.tools.pigstats.tez.TezPigScriptStats;
 import org.apache.tez.client.TezClient;
@@ -56,7 +51,7 @@ import com.google.common.collect.Maps;
  */
 public class TezJob implements Runnable {
     private static final Log log = LogFactory.getLog(TezJob.class);
-    private TezConfiguration conf;
+    private Configuration conf;
     private EnumSet<StatusGetOpts> statusGetOpts;
     private Map<String, LocalResource> requestAMResources;
     private ApplicationId appId;
@@ -74,71 +69,31 @@ public class TezJob implements Runnable
 
     public TezJob(TezConfiguration conf, DAG dag,
             Map<String, LocalResource> requestAMResources,
-            TezOperPlan tezPlan) throws IOException {
+            int estimatedTotalParallelism) throws IOException {
         this.conf = conf;
         this.dag = dag;
         this.requestAMResources = requestAMResources;
         this.reuseSession = conf.getBoolean(PigConfiguration.PIG_TEZ_SESSION_REUSE, true);
         this.statusGetOpts = EnumSet.of(StatusGetOpts.GET_COUNTERS);
-        tezJobConf = new TezJobConfig(tezPlan);
+        tezJobConf = new TezJobConfig(estimatedTotalParallelism);
     }
 
     static class TezJobConfig {
 
         private int estimatedTotalParallelism = -1;
-        private int maxOutputsinSingleVertex;
-        private int totalVertices  = 0;
 
-        public TezJobConfig(TezOperPlan tezPlan) throws VisitorException {
-            this.estimatedTotalParallelism = tezPlan.getEstimatedTotalParallelism();
-            MaxOutputsFinder finder = new MaxOutputsFinder(tezPlan);
-            finder.visit();
-            this.maxOutputsinSingleVertex = finder.getMaxOutputsinSingleVertex();
-            this.totalVertices = finder.getTotalVertices();
+        public TezJobConfig(int estimatedTotalParallelism) {
+            this.estimatedTotalParallelism = estimatedTotalParallelism;
         }
 
         public int getEstimatedTotalParallelism() {
             return estimatedTotalParallelism;
         }
 
-        public int getMaxOutputsinSingleVertex() {
-            return maxOutputsinSingleVertex;
+        public void setEstimatedTotalParallelism(int estimatedTotalParallelism) {
+            this.estimatedTotalParallelism = estimatedTotalParallelism;
         }
 
-        public int getTotalVertices() {
-            return totalVertices;
-        }
-
-    }
-
-    private static class MaxOutputsFinder extends TezOpPlanVisitor {
-
-        private int maxOutputsinSingleVertex  = 1;
-        private int totalVertices  = 0;
-
-        public MaxOutputsFinder(TezOperPlan plan) {
-            super(plan, new DependencyOrderWalker<TezOperator, TezOperPlan>(plan));
-        }
-
-        public int getMaxOutputsinSingleVertex() {
-            return maxOutputsinSingleVertex;
-        }
-
-        public int getTotalVertices() {
-            return totalVertices;
-        }
-
-        @Override
-        public void visitTezOp(TezOperator tezOperator) throws VisitorException {
-            if (!tezOperator.isVertexGroup()) {
-                totalVertices++;
-                int outputs = tezOperator.outEdges.keySet().size();
-                maxOutputsinSingleVertex = maxOutputsinSingleVertex > outputs ? maxOutputsinSingleVertex : outputs;
-            }
-        }
-
-
-
     }
 
     public DAG getDAG() {

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java Fri Feb 24 03:34:37 2017
@@ -19,7 +19,6 @@ package org.apache.pig.backend.hadoop.ex
 
 import java.io.File;
 import java.io.IOException;
-import java.lang.reflect.Method;
 import java.net.URI;
 import java.util.HashMap;
 import java.util.Map;
@@ -31,7 +30,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.pig.PigException;
-import org.apache.pig.backend.hadoop.PigATSClient;
 import org.apache.pig.backend.hadoop.executionengine.JobCreationException;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPlanContainer;
@@ -52,12 +50,11 @@ public class TezJobCompiler {
     private static final Log log = LogFactory.getLog(TezJobCompiler.class);
 
     private PigContext pigContext;
-    private Configuration conf;
-    private boolean disableDAGRecovery;
+    private TezConfiguration tezConf;
 
     public TezJobCompiler(PigContext pigContext, Configuration conf) throws IOException {
         this.pigContext = pigContext;
-        this.conf = conf;
+        this.tezConf = new TezConfiguration(conf);
     }
 
     public DAG buildDAG(TezPlanContainerNode tezPlanNode, Map<String, LocalResource> localResources)
@@ -67,7 +64,6 @@ public class TezJobCompiler {
         TezDagBuilder dagBuilder = new TezDagBuilder(pigContext, tezPlanNode.getTezOperPlan(), tezDag, localResources);
         dagBuilder.visit();
         dagBuilder.avoidContainerReuseIfInputSplitInDisk();
-        disableDAGRecovery = dagBuilder.shouldDisableDAGRecovery();
         return tezDag;
     }
 
@@ -89,7 +85,6 @@ public class TezJobCompiler {
         return job;
     }
 
-    @SuppressWarnings({ "rawtypes", "unchecked" })
     private TezJob getJob(TezPlanContainerNode tezPlanNode, TezPlanContainer planContainer)
             throws JobCreationException {
         try {
@@ -112,34 +107,8 @@ public class TezJobCompiler {
             }
             DAG tezDag = buildDAG(tezPlanNode, localResources);
             tezDag.setDAGInfo(createDagInfo(TezScriptState.get().getScript()));
-            // set Tez caller context
-            // Reflection for the following code since it is only available since tez 0.8.1:
-            // CallerContext context = CallerContext.create(ATSService.CallerContext, ATSService.getPigAuditId(pigContext),
-            //     ATSService.EntityType, "");
-            // tezDag.setCallerContext(context);
-            Class callerContextClass = null;
-            try {
-                callerContextClass = Class.forName("org.apache.tez.client.CallerContext");
-            } catch (ClassNotFoundException e) {
-                // If pre-Tez 0.8.1, skip setting CallerContext
-            }
-            if (callerContextClass != null) {
-                Method builderBuildMethod = callerContextClass.getMethod("create", String.class,
-                        String.class, String.class, String.class);
-                Object context = builderBuildMethod.invoke(null, PigATSClient.CALLER_CONTEXT,
-                        PigATSClient.getPigAuditId(pigContext), PigATSClient.ENTITY_TYPE, "");
-                Method dagSetCallerContext = tezDag.getClass().getMethod("setCallerContext",
-                        context.getClass());
-                dagSetCallerContext.invoke(tezDag, context);
-            }
             log.info("Total estimated parallelism is " + tezPlan.getEstimatedTotalParallelism());
-            TezConfiguration tezConf = new TezConfiguration(conf);
-            if (disableDAGRecovery
-                    && tezConf.getBoolean(TezConfiguration.DAG_RECOVERY_ENABLED,
-                            TezConfiguration.DAG_RECOVERY_ENABLED_DEFAULT)) {
-                tezConf.setBoolean(TezConfiguration.DAG_RECOVERY_ENABLED, false);
-            }
-            return new TezJob(tezConf, tezDag, localResources, tezPlan);
+            return new TezJob(tezConf, tezDag, localResources, tezPlan.getEstimatedTotalParallelism());
         } catch (Exception e) {
             int errCode = 2017;
             String msg = "Internal error creating job configuration.";

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java Fri Feb 24 03:34:37 2017
@@ -22,7 +22,6 @@ import java.io.PrintStream;
 import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -167,7 +166,7 @@ public class TezLauncher extends Launche
         tezStats = new TezPigScriptStats(pc);
         PigStats.start(tezStats);
 
-        conf.setIfUnset(TezConfiguration.TEZ_USE_CLUSTER_HADOOP_LIBS, "true");
+        conf.set(TezConfiguration.TEZ_USE_CLUSTER_HADOOP_LIBS, "true");
         TezJobCompiler jc = new TezJobCompiler(pc, conf);
         TezPlanContainer tezPlanContainer = compile(php, pc);
 
@@ -175,10 +174,6 @@ public class TezLauncher extends Launche
         tezScriptState.emitInitialPlanNotification(tezPlanContainer);
         tezScriptState.emitLaunchStartedNotification(tezPlanContainer.size()); //number of DAGs to Launch
 
-        boolean stop_on_failure =
-                Boolean.valueOf(pc.getProperties().getProperty("stop.on.failure", "false"));
-        boolean stoppedOnFailure = false;
-
         TezPlanContainerNode tezPlanContainerNode;
         TezOperPlan tezPlan;
         int processedDAGs = 0;
@@ -257,18 +252,7 @@ public class TezLauncher extends Launche
                     ((tezPlanContainer.size() - processedDAGs)/tezPlanContainer.size()) * 100);
             }
             handleUnCaughtException(pc);
-            boolean tezDAGSucceeded = reporter.notifyFinishedOrFailed();
-            tezPlanContainer.updatePlan(tezPlan, tezDAGSucceeded);
-            // if stop_on_failure is enabled, we need to stop immediately when any job has failed
-            if (!tezDAGSucceeded) {
-                if (stop_on_failure) {
-                    stoppedOnFailure = true;
-                    break;
-                } else {
-                    log.warn("Ooops! Some job has failed! Specify -stop_on_failure if you "
-                            + "want Pig to stop immediately on failure.");
-                }
-            }
+            tezPlanContainer.updatePlan(tezPlan, reporter.notifyFinishedOrFailed());
         }
 
         tezStats.finish();
@@ -295,11 +279,6 @@ public class TezLauncher extends Launche
             }
         }
 
-        if (stoppedOnFailure) {
-            throw new ExecException("Stopping execution on job failure with -stop_on_failure option", 6017,
-                    PigException.REMOTE_ENVIRONMENT);
-        }
-
         return tezStats;
     }
 
@@ -423,11 +402,9 @@ public class TezLauncher extends Launche
         TezCompiler comp = new TezCompiler(php, pc);
         comp.compile();
         TezPlanContainer planContainer = comp.getPlanContainer();
-        // Doing a sort so that test plan printed remains same between jdk7 and jdk8
-        List<OperatorKey> opKeys = new ArrayList<>(planContainer.getKeys().keySet());
-        Collections.sort(opKeys);
-        for (OperatorKey opKey : opKeys) {
-            TezOperPlan tezPlan = planContainer.getOperator(opKey).getTezOperPlan();
+        for (Map.Entry<OperatorKey, TezPlanContainerNode> entry : planContainer
+                .getKeys().entrySet()) {
+            TezOperPlan tezPlan = entry.getValue().getTezOperPlan();
             optimize(tezPlan, pc);
         }
         return planContainer;
@@ -522,7 +499,7 @@ public class TezLauncher extends Launche
 
     @Override
     public void killJob(String jobID, Configuration conf) throws BackendException {
-        if (runningJob != null && runningJob.getApplicationId().toString().equals(jobID)) {
+        if (runningJob != null && runningJob.getApplicationId().toString() == jobID) {
             try {
                 runningJob.killJob();
             } catch (Exception e) {

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java Fri Feb 24 03:34:37 2017
@@ -39,8 +39,6 @@ import org.apache.pig.PigConfiguration;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileLocalizer;
 
-import com.google.common.annotations.VisibleForTesting;
-
 public class TezResourceManager {
     private static TezResourceManager instance = null;
     private boolean inited = false;
@@ -61,7 +59,6 @@ public class TezResourceManager {
     /**
      * This method is only used by test code to reset state.
      */
-    @VisibleForTesting
     public static void dropInstance() {
         instance = null;
     }
@@ -69,7 +66,7 @@ public class TezResourceManager {
     public void init(PigContext pigContext, Configuration conf) throws IOException {
         if (!inited) {
             this.resourcesDir = FileLocalizer.getTemporaryResourcePath(pigContext);
-            this.remoteFs = resourcesDir.getFileSystem(conf);
+            this.remoteFs = FileSystem.get(conf);
             this.conf = conf;
             this.pigContext = pigContext;
             this.inited = true;

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java Fri Feb 24 03:34:37 2017
@@ -18,9 +18,7 @@
 package org.apache.pig.backend.hadoop.executionengine.tez;
 
 import java.io.IOException;
-import java.text.SimpleDateFormat;
 import java.util.ArrayList;
-import java.util.Calendar;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -31,11 +29,9 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.pig.PigConfiguration;
 import org.apache.pig.backend.hadoop.executionengine.tez.TezJob.TezJobConfig;
 import org.apache.pig.backend.hadoop.executionengine.tez.util.MRToTezHelper;
 import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.PigImplConstants;
 import org.apache.pig.impl.util.Utils;
 import org.apache.pig.tools.pigstats.tez.TezScriptState;
 import org.apache.tez.client.TezAppMasterStatus;
@@ -50,13 +46,13 @@ public class TezSessionManager {
     private static final Log log = LogFactory.getLog(TezSessionManager.class);
 
     static {
-        Utils.addShutdownHookWithPriority(new Runnable() {
+        Runtime.getRuntime().addShutdownHook(new Thread() {
 
             @Override
             public void run() {
                 TezSessionManager.shutdown();
             }
-        }, PigImplConstants.SHUTDOWN_HOOK_JOB_KILL_PRIORITY);
+        });
     }
 
     private static ReentrantReadWriteLock sessionPoolLock = new ReentrantReadWriteLock();
@@ -65,17 +61,11 @@ public class TezSessionManager {
     private TezSessionManager() {
     }
 
-    private static class SessionInfo {
-
-        public SessionInfo(TezClient session, TezConfiguration config, Map<String, LocalResource> resources) {
+    public static class SessionInfo {
+        SessionInfo(TezClient session, Map<String, LocalResource> resources) {
             this.session = session;
-            this.config = config;
             this.resources = resources;
         }
-
-        public TezConfiguration getConfig() {
-            return config;
-        }
         public Map<String, LocalResource> getResources() {
             return resources;
         }
@@ -87,23 +77,20 @@ public class TezSessionManager {
         }
         private TezClient session;
         private Map<String, LocalResource> resources;
-        private TezConfiguration config;
         private boolean inUse = false;
     }
 
     private static List<SessionInfo> sessionPool = new ArrayList<SessionInfo>();
 
-    private static SessionInfo createSession(TezConfiguration amConf,
+    private static SessionInfo createSession(Configuration conf,
             Map<String, LocalResource> requestedAMResources, Credentials creds,
             TezJobConfig tezJobConf) throws TezException, IOException,
             InterruptedException {
-        MRToTezHelper.translateMRSettingsForTezAM(amConf);
+        TezConfiguration amConf = MRToTezHelper.getDAGAMConfFromMRConf(conf);
         TezScriptState ss = TezScriptState.get();
         ss.addDAGSettingsToConf(amConf);
-        if (amConf.getBoolean(PigConfiguration.PIG_TEZ_CONFIGURE_AM_MEMORY, true)) {
-            adjustAMConfig(amConf, tezJobConf);
-        }
-        String jobName = amConf.get(PigContext.JOB_NAME, "pig");
+        adjustAMConfig(amConf, tezJobConf);
+        String jobName = conf.get(PigContext.JOB_NAME, "pig");
         TezClient tezClient = TezClient.create(jobName, amConf, true, requestedAMResources, creds);
         try {
             tezClient.start();
@@ -117,10 +104,12 @@ public class TezSessionManager {
             tezClient.stop();
             throw new RuntimeException(e);
         }
-        return new SessionInfo(tezClient, amConf, requestedAMResources);
+        return new SessionInfo(tezClient, requestedAMResources);
     }
 
     private static void adjustAMConfig(TezConfiguration amConf, TezJobConfig tezJobConf) {
+        int requiredAMMaxHeap = -1;
+        int requiredAMResourceMB = -1;
         String amLaunchOpts = amConf.get(
                 TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS,
                 TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS_DEFAULT);
@@ -133,10 +122,8 @@ public class TezSessionManager {
 
             // Need more room for native memory/virtual address space
             // when close to 4G due to 32-bit jvm 4G limit
-            int maxAMHeap = Utils.is64bitJVM() ? 3584 : 3200;
-            int maxAMResourceMB = 4096;
-            int requiredAMResourceMB = maxAMResourceMB;
-            int requiredAMMaxHeap = maxAMHeap;
+            int minAMMaxHeap = 3200;
+            int minAMResourceMB = 4096;
 
             // Rough estimation. For 5K tasks 1G Xmx and 1.5G resource.mb
             // Increment container size by 512 mb for every additional 5K tasks.
@@ -148,38 +135,22 @@ public class TezSessionManager {
             //     5000 and above  - 1024Xmx, 1536 (512 native memory)
             for (int taskCount = 30000; taskCount >= 5000; taskCount-=5000) {
                 if (tezJobConf.getEstimatedTotalParallelism() >= taskCount) {
+                    requiredAMMaxHeap = minAMMaxHeap;
+                    requiredAMResourceMB = minAMResourceMB;
                     break;
                 }
-                requiredAMResourceMB = requiredAMResourceMB - 512;
-                requiredAMMaxHeap = requiredAMResourceMB - 512;
-            }
-
-            if (tezJobConf.getTotalVertices() > 30) {
-                //Add 512 mb per 30 vertices
-                int additionaMem = 512 * (tezJobConf.getTotalVertices() / 30);
-                requiredAMResourceMB = requiredAMResourceMB + additionaMem;
-                requiredAMMaxHeap = requiredAMResourceMB - 512;
-            }
-
-            if (tezJobConf.getMaxOutputsinSingleVertex() > 10) {
-                //Add 256 mb per 5 outputs if a vertex has more than 10 outputs
-                int additionaMem = 256 * (tezJobConf.getMaxOutputsinSingleVertex() / 5);
-                requiredAMResourceMB = requiredAMResourceMB + additionaMem;
-                requiredAMMaxHeap = requiredAMResourceMB - 512;
+                minAMResourceMB = minAMResourceMB - 512;
+                minAMMaxHeap = minAMResourceMB - 512;
             }
 
-            requiredAMResourceMB = Math.min(maxAMResourceMB, requiredAMResourceMB);
-            requiredAMMaxHeap = Math.min(maxAMHeap, requiredAMMaxHeap);
-
             if (requiredAMResourceMB > -1 && configuredAMResourceMB < requiredAMResourceMB) {
                 amConf.setInt(TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB, requiredAMResourceMB);
                 log.info("Increasing "
                         + TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB + " from "
                         + configuredAMResourceMB + " to "
                         + requiredAMResourceMB
-                        + " as total estimated tasks = " + tezJobConf.getEstimatedTotalParallelism()
-                        + ", total vertices = " + tezJobConf.getTotalVertices()
-                        + ", max outputs = " + tezJobConf.getMaxOutputsinSingleVertex());
+                        + " as the number of total estimated tasks is "
+                        + tezJobConf.getEstimatedTotalParallelism());
 
                 if (requiredAMMaxHeap > -1 && configuredAMMaxHeap < requiredAMMaxHeap) {
                     amConf.set(TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS,
@@ -187,9 +158,8 @@ public class TezSessionManager {
                     log.info("Increasing Tez AM Heap Size from "
                             + configuredAMMaxHeap + "M to "
                             + requiredAMMaxHeap
-                            + "M as total estimated tasks = " + tezJobConf.getEstimatedTotalParallelism()
-                            + ", total vertices = " + tezJobConf.getTotalVertices()
-                            + ", max outputs = " + tezJobConf.getMaxOutputsinSingleVertex());
+                            + "M as the number of total estimated tasks is "
+                            + tezJobConf.getEstimatedTotalParallelism());
                     log.info("Value of " + TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS + " is now "
                             + amConf.get(TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS));
                 }
@@ -208,22 +178,7 @@ public class TezSessionManager {
         return true;
     }
 
-    private static boolean validateSessionConfig(SessionInfo currentSession,
-            Configuration newSessionConfig)
-            throws TezException, IOException {
-        // If DAG recovery is disabled for one and enabled for another, do not reuse
-        if (currentSession.getConfig().getBoolean(
-                    TezConfiguration.DAG_RECOVERY_ENABLED,
-                    TezConfiguration.DAG_RECOVERY_ENABLED_DEFAULT)
-                != newSessionConfig.getBoolean(
-                        TezConfiguration.DAG_RECOVERY_ENABLED,
-                        TezConfiguration.DAG_RECOVERY_ENABLED_DEFAULT)) {
-            return false;
-        }
-        return true;
-    }
-
-    static TezClient getClient(TezConfiguration conf, Map<String, LocalResource> requestedAMResources,
+    static TezClient getClient(Configuration conf, Map<String, LocalResource> requestedAMResources,
             Credentials creds, TezJobConfig tezJobConf) throws TezException, IOException, InterruptedException {
         List<SessionInfo> sessionsToRemove = new ArrayList<SessionInfo>();
         SessionInfo newSession = null;
@@ -241,8 +196,7 @@ public class TezSessionManager {
                         sessionsToRemove.add(sessionInfo);
                     } else if (!sessionInfo.inUse
                             && appMasterStatus.equals(TezAppMasterStatus.READY)
-                            && validateSessionResources(sessionInfo,requestedAMResources)
-                            && validateSessionConfig(sessionInfo, conf)) {
+                            && validateSessionResources(sessionInfo,requestedAMResources)) {
                         sessionInfo.inUse = true;
                         return sessionInfo.session;
                     }
@@ -299,11 +253,6 @@ public class TezSessionManager {
                 synchronized (sessionInfo) {
                     if (sessionInfo.session == session) {
                         log.info("Stopping Tez session " + session);
-                        String timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
-                                    .format(Calendar.getInstance().getTime());
-                        System.err.println(timeStamp + " Shutting down Tez session "
-                                + ", sessionName=" + session.getClientName()
-                                + ", applicationId=" + session.getAppMasterApplicationId());
                         session.stop();
                         sessionToRemove = sessionInfo;
                         break;
@@ -330,30 +279,19 @@ public class TezSessionManager {
             shutdown = true;
             for (SessionInfo sessionInfo : sessionPool) {
                 synchronized (sessionInfo) {
-                    TezClient session = sessionInfo.session;
                     try {
-                        String timeStamp = new SimpleDateFormat(
-                                "yyyy-MM-dd HH:mm:ss").format(Calendar.getInstance().getTime());
-                        if (session.getAppMasterStatus().equals(
+                        if (sessionInfo.session.getAppMasterStatus().equals(
                                 TezAppMasterStatus.SHUTDOWN)) {
                             log.info("Tez session is already shutdown "
-                                    + session);
-                            System.err.println(timeStamp
-                                    + " Tez session is already shutdown " + session
-                                    + ", sessionName=" + session.getClientName()
-                                    + ", applicationId=" + session.getAppMasterApplicationId());
+                                    + sessionInfo.session);
                             continue;
                         }
-                        log.info("Shutting down Tez session " + session);
-                        // Since hadoop calls org.apache.log4j.LogManager.shutdown();
-                        // the log.info message is not displayed with shutdown hook in Oozie
-                        System.err.println(timeStamp + " Shutting down Tez session "
-                                + ", sessionName=" + session.getClientName()
-                                + ", applicationId=" + session.getAppMasterApplicationId());
-                        session.stop();
+                        log.info("Shutting down Tez session "
+                                + sessionInfo.session);
+                        sessionInfo.session.stop();
                     } catch (Exception e) {
                         log.error("Error shutting down Tez session "
-                                + session, e);
+                                + sessionInfo.session, e);
                     }
                 }
             }