You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by zl...@apache.org on 2017/02/24 03:34:40 UTC

svn commit: r1784224 [7/17] - in /pig/branches/spark: ./ bin/ conf/ contrib/piggybank/java/ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/util/apachelo...

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java Fri Feb 24 03:34:37 2017
@@ -62,6 +62,7 @@ import org.apache.tez.dag.api.EdgeProper
  */
 public class TezOperDependencyParallelismEstimator implements TezParallelismEstimator {
 
+    static private int maxTaskCount;
     static final double DEFAULT_FLATTEN_FACTOR = 10;
     static final double DEFAULT_FILTER_FACTOR = 0.7;
     static final double DEFAULT_LIMIT_FACTOR = 0.1;
@@ -75,8 +76,6 @@ public class TezOperDependencyParallelis
     static final double DEFAULT_AGGREGATION_FACTOR = 0.7;
 
     private PigContext pc;
-    private int maxTaskCount;
-    private long bytesPerReducer;
 
     @Override
     public void setPigContext(PigContext pc) {
@@ -95,18 +94,16 @@ public class TezOperDependencyParallelis
         maxTaskCount = conf.getInt(PigReducerEstimator.MAX_REDUCER_COUNT_PARAM,
                 PigReducerEstimator.DEFAULT_MAX_REDUCER_COUNT_PARAM);
 
-        bytesPerReducer = conf.getLong(PigReducerEstimator.BYTES_PER_REDUCER_PARAM, PigReducerEstimator.DEFAULT_BYTES_PER_REDUCER);
-
-        // If we have already estimated parallelism, use that one
-        if (tezOper.getEstimatedParallelism() != -1) {
-            return tezOper.getEstimatedParallelism();
-        }
-
         // If parallelism is set explicitly, respect it
         if (!tezOper.isIntermediateReducer() && tezOper.getRequestedParallelism()!=-1) {
             return tezOper.getRequestedParallelism();
         }
 
+        // If we have already estimated parallelism, use that one
+        if (tezOper.getEstimatedParallelism()!=-1) {
+            return tezOper.getEstimatedParallelism();
+        }
+
         List<TezOperator> preds = plan.getPredecessors(tezOper);
         if (preds==null) {
             throw new IOException("Cannot estimate parallelism for source vertex");
@@ -133,12 +130,6 @@ public class TezOperDependencyParallelis
                 boolean applyFactor = !tezOper.isUnion();
                 if (!pred.isVertexGroup() && applyFactor) {
                     predParallelism = predParallelism * pred.getParallelismFactor(tezOper);
-                    if (pred.getTotalInputFilesSize() > 0) {
-                        // Estimate similar to mapreduce and use the maximum of two
-                        int parallelismBySize = (int) Math.ceil((double) pred
-                                .getTotalInputFilesSize() / bytesPerReducer);
-                        predParallelism = Math.max(predParallelism, parallelismBySize);
-                    }
                 }
                 estimatedParallelism += predParallelism;
             }
@@ -166,7 +157,9 @@ public class TezOperDependencyParallelis
         }
 
         if (roundedEstimatedParallelism == 0) {
-            roundedEstimatedParallelism = 1; // We need to produce empty output file
+            throw new IOException("Estimated parallelism for "
+                    + tezOper.getOperatorKey().toString()
+                    + " is 0 which is unexpected");
         }
 
         return roundedEstimatedParallelism;
@@ -203,7 +196,7 @@ public class TezOperDependencyParallelis
             if (successor != null) {
                 // Map side combiner
                 TezEdgeDescriptor edge = tezOp.outEdges.get(successor.getOperatorKey());
-                if (!edge.combinePlan.isEmpty() || edge.needsDistinctCombiner()) {
+                if (!edge.combinePlan.isEmpty()) {
                     if (successor.isDistinct()) {
                         factor = DEFAULT_DISTINCT_FACTOR;
                     } else {

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java Fri Feb 24 03:34:37 2017
@@ -29,7 +29,6 @@ import java.util.Set;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.pig.PigConfiguration;
-import org.apache.pig.StoreFunc;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
@@ -45,7 +44,6 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POStoreTez;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POValueOutputTez;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.ReadScalarsTez;
-import org.apache.pig.backend.hadoop.executionengine.tez.runtime.HashValuePartitioner;
 import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezInput;
 import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezOutput;
 import org.apache.pig.backend.hadoop.executionengine.tez.util.TezCompilerUtil;
@@ -54,6 +52,7 @@ import org.apache.pig.builtin.AvroStorag
 import org.apache.pig.builtin.JsonStorage;
 import org.apache.pig.builtin.OrcStorage;
 import org.apache.pig.builtin.PigStorage;
+import org.apache.pig.builtin.RoundRobinPartitioner;
 import org.apache.pig.builtin.mock.Storage;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.PlanException;
@@ -109,12 +108,6 @@ public class UnionOptimizer extends TezO
         if((tezOp.isLimit() || tezOp.isLimitAfterSort()) && tezOp.getRequestedParallelism() == 1) {
             return false;
         }
-
-        // If user has specified a PARALLEL clause with the union operator
-        // turn off union optimization
-        if (tezOp.getRequestedParallelism() != -1) {
-            return false;
-        }
         // Two vertices separately ranking with 1 to n and writing to output directly
         // will make each rank repeate twice which is wrong. Rank always needs to be
         // done from single vertex to have the counting correct.
@@ -127,25 +120,10 @@ public class UnionOptimizer extends TezO
     public static boolean isOptimizableStoreFunc(TezOperator tezOp,
             List<String> supportedStoreFuncs, List<String> unsupportedStoreFuncs)
             throws VisitorException {
-        List<POStoreTez> stores = PlanHelper.getPhysicalOperators(tezOp.plan, POStoreTez.class);
-
-        for (POStoreTez store : stores) {
-            String name = store.getStoreFunc().getClass().getName();
-            if (store.getStoreFunc() instanceof StoreFunc) {
-                StoreFunc func = (StoreFunc) store.getStoreFunc();
-                if (func.supportsParallelWriteToStoreLocation() != null) {
-                    if (func.supportsParallelWriteToStoreLocation()) {
-                        continue;
-                    } else {
-                        LOG.warn(name + " does not support union optimization."
-                                + " Disabling it. There will be some performance degradation.");
-                        return false;
-                    }
-                }
-            }
-            // If StoreFunc does not explicitly state support, then check supported and
-            // unsupported config settings.
-            if (supportedStoreFuncs != null || unsupportedStoreFuncs != null) {
+        if (supportedStoreFuncs != null || unsupportedStoreFuncs != null) {
+            List<POStoreTez> stores = PlanHelper.getPhysicalOperators(tezOp.plan, POStoreTez.class);
+            for (POStoreTez store : stores) {
+                String name = store.getStoreFunc().getClass().getName();
                 if (unsupportedStoreFuncs != null
                         && unsupportedStoreFuncs.contains(name)) {
                     return false;
@@ -259,23 +237,8 @@ public class UnionOptimizer extends TezO
                 for (TezOperator succ : successors) {
                     if (succ.isVertexGroup() && unionStoreOutputs.get(i).getSFile().equals(succ.getVertexGroupInfo().getSFile())) {
                         existingVertexGroup = succ;
-                        break;
-                    }
-                }
-            }
-            if (existingVertexGroup == null) {
-                // In the case of union + split + union + store, the different stores in the Split
-                // will be writing to same location after second union operator is optimized.
-                // So while optimizing the first union, we should just make it write to one vertex group
-                for (int j = 0; j < i; j++) {
-                    if (unionStoreOutputs.get(i).getSFile().equals(storeVertexGroupOps[j].getVertexGroupInfo().getSFile())) {
-                        storeVertexGroupOps[i] = storeVertexGroupOps[j];
-                        break;
                     }
                 }
-                if (storeVertexGroupOps[i] != null) {
-                    continue;
-                }
             }
             if (existingVertexGroup != null) {
                 storeVertexGroupOps[i] = existingVertexGroup;
@@ -307,15 +270,6 @@ public class UnionOptimizer extends TezO
         TezOperator[] outputVertexGroupOps = new TezOperator[unionOutputKeys.size()];
         String[] newOutputKeys = new String[unionOutputKeys.size()];
         for (int i=0; i < outputVertexGroupOps.length; i++) {
-            for (int j = 0; j < i; j++) {
-                if (unionOutputKeys.get(i).equals(unionOutputKeys.get(j))) {
-                    outputVertexGroupOps[i] = outputVertexGroupOps[j];
-                    break;
-                }
-            }
-            if (outputVertexGroupOps[i] != null) {
-                continue;
-            }
             outputVertexGroupOps[i] = new TezOperator(OperatorKey.genOpKey(scope));
             outputVertexGroupOps[i].setVertexGroupInfo(new VertexGroupInfo());
             outputVertexGroupOps[i].getVertexGroupInfo().setOutput(unionOutputKeys.get(i));
@@ -561,24 +515,15 @@ public class UnionOptimizer extends TezO
         // Connect predecessor to the storeVertexGroups
         int i = 0;
         for (TezOperator storeVertexGroup : storeVertexGroupOps) {
-            // Skip connecting if they are already connected. Can happen in case of
-            // union + split + union + store. Because of the split all the stores
-            // will be writing to same location
-            List<OperatorKey> inputs = storeVertexGroup.getVertexGroupInfo().getInputs();
-            if (inputs == null || !inputs.contains(pred.getOperatorKey())) {
-                tezPlan.connect(pred, storeVertexGroup);
-            }
             storeVertexGroup.getVertexGroupInfo().addInput(pred.getOperatorKey());
             pred.addVertexGroupStore(clonedUnionStoreOutputs.get(i++).getOperatorKey(),
                     storeVertexGroup.getOperatorKey());
+            tezPlan.connect(pred, storeVertexGroup);
         }
 
         for (TezOperator outputVertexGroup : outputVertexGroupOps) {
-            List<OperatorKey> inputs = outputVertexGroup.getVertexGroupInfo().getInputs();
-            if (inputs == null || !inputs.contains(pred.getOperatorKey())) {
-                tezPlan.connect(pred, outputVertexGroup);
-            }
             outputVertexGroup.getVertexGroupInfo().addInput(pred.getOperatorKey());
+            tezPlan.connect(pred, outputVertexGroup);
         }
 
         copyOperatorProperties(pred, unionOp);
@@ -623,7 +568,7 @@ public class UnionOptimizer extends TezO
             // more union predecessors. Change it to SCATTER_GATHER
             if (edge.dataMovementType == DataMovementType.ONE_TO_ONE) {
                 edge.dataMovementType = DataMovementType.SCATTER_GATHER;
-                edge.partitionerClass = HashValuePartitioner.class;
+                edge.partitionerClass = RoundRobinPartitioner.class;
                 edge.outputClassName = UnorderedPartitionedKVOutput.class.getName();
                 edge.inputClassName = UnorderedKVInput.class.getName();
             }

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PartitionerDefinedVertexManager.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PartitionerDefinedVertexManager.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PartitionerDefinedVertexManager.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PartitionerDefinedVertexManager.java Fri Feb 24 03:34:37 2017
@@ -17,25 +17,23 @@
  */
 package org.apache.pig.backend.hadoop.executionengine.tez.runtime;
 
-import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
 import org.apache.tez.dag.api.EdgeProperty;
 import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.VertexManagerPlugin;
 import org.apache.tez.dag.api.VertexManagerPluginContext;
 import org.apache.tez.dag.api.VertexManagerPluginContext.TaskWithLocationHint;
-import org.apache.tez.dag.api.event.VertexState;
-import org.apache.tez.dag.api.event.VertexStateUpdate;
+import org.apache.tez.dag.app.dag.impl.ScatterGatherEdgeManager;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.events.VertexManagerEvent;
 
-import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 
 /**
@@ -48,13 +46,8 @@ import com.google.common.collect.Lists;
 public class PartitionerDefinedVertexManager extends VertexManagerPlugin {
     private static final Log LOG = LogFactory.getLog(PartitionerDefinedVertexManager.class);
 
-    private volatile boolean parallelismSet;
+    private boolean isParallelismSet = false;
     private int dynamicParallelism = -1;
-    private int numConfiguredSources;
-    private int numSources = -1;
-    private volatile boolean configured;
-    private volatile boolean started;
-    private volatile boolean scheduled;
 
     public PartitionerDefinedVertexManager(VertexManagerPluginContext context) {
         super(context);
@@ -62,31 +55,7 @@ public class PartitionerDefinedVertexMan
 
     @Override
     public void initialize() {
-        // this will prevent vertex from starting until we notify we are done
-        getContext().vertexReconfigurationPlanned();
-        parallelismSet = false;
-        numConfiguredSources = 0;
-        configured = false;
-        started = false;
-        numSources = getContext().getInputVertexEdgeProperties().size();
-        // wait for sources and self to start
-        Map<String, EdgeProperty> edges = getContext().getInputVertexEdgeProperties();
-        for (String entry : edges.keySet()) {
-            getContext().registerForVertexStateUpdates(entry, EnumSet.of(VertexState.CONFIGURED));
-        }
-    }
-
-    @Override
-    public synchronized void onVertexStateUpdated(VertexStateUpdate stateUpdate)
-            throws Exception {
-        numConfiguredSources++;
-        LOG.info("For vertex: " + getContext().getVertexName() + " Received configured signal from: "
-            + stateUpdate.getVertexName() + " numConfiguredSources: " + numConfiguredSources
-            + " needed: " + numSources);
-        Preconditions.checkState(numConfiguredSources <= numSources, "Vertex: " + getContext().getVertexName());
-        if (numConfiguredSources == numSources) {
-            configure();
-        }
+        // Nothing to do
     }
 
     @Override
@@ -104,9 +73,10 @@ public class PartitionerDefinedVertexMan
     public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) throws Exception {
         // There could be multiple partition vertex sending VertexManagerEvent
         // Only need to setVertexParallelism once
-        if (parallelismSet) {
+        if (isParallelismSet) {
             return;
         }
+        isParallelismSet = true;
         // Need to distinguish from VertexManagerEventPayloadProto emitted by OrderedPartitionedKVOutput
         if (vmEvent.getUserPayload().limit()==4) {
             dynamicParallelism = vmEvent.getUserPayload().getInt();
@@ -126,50 +96,18 @@ public class PartitionerDefinedVertexMan
                     edgeManagers.put(entry.getKey(), edge);
                 }
                 getContext().reconfigureVertex(dynamicParallelism, null, edgeManagers);
-                parallelismSet = true;
-                configure();
-            }
-        }
-    }
-
-    private void configure() {
-        if(parallelismSet && (numSources == numConfiguredSources)) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Done reconfiguring vertex " + getContext().getVertexName());
             }
-            getContext().doneReconfiguringVertex();
-            configured = true;
-            trySchedulingTasks();
         }
     }
 
-    private synchronized void trySchedulingTasks() {
-        if (configured && started && !scheduled) {
-            LOG.info("Scheduling " + dynamicParallelism + " tasks for vertex " + getContext().getVertexName());
+    @Override
+    public void onVertexStarted(Map<String, List<Integer>> completions) {
+        if (dynamicParallelism != -1) {
             List<TaskWithLocationHint> tasksToStart = Lists.newArrayListWithCapacity(dynamicParallelism);
-            for (int i = 0; i < dynamicParallelism; ++i) {
+            for (int i=0; i<dynamicParallelism; ++i) {
                 tasksToStart.add(new TaskWithLocationHint(new Integer(i), null));
             }
             getContext().scheduleVertexTasks(tasksToStart);
-            scheduled = true;
         }
     }
-
-    @Override
-    public void onVertexStarted(Map<String, List<Integer>> completions) {
-        // This vertex manager will be getting the following calls
-        //   1) onVertexManagerEventReceived - Parallelism vertex manager event sent by sample aggregator vertex
-        //   2) onVertexStateUpdated - Vertex CONFIGURED status updates from
-        //       - Order by Partitioner vertex (1-1) in case of Order by
-        //       - Skewed Join Left Partitioner (1-1) and Right Input Vertices in case of SkewedJoin
-        //   3) onVertexStarted
-        // Calls 2) and 3) can happen in any order. So we should schedule tasks
-        // only after start is called and configuration is also complete
-        started = true;
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Vertex start received for " + getContext().getVertexName());
-        }
-        trySchedulingTasks();
-    }
-
 }

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigGraceShuffleVertexManager.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigGraceShuffleVertexManager.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigGraceShuffleVertexManager.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigGraceShuffleVertexManager.java Fri Feb 24 03:34:37 2017
@@ -33,15 +33,15 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.optimizer.ParallelismSetter;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.optimizer.TezEstimatedParallelismClearer;
 import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.PigImplConstants;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.EdgeProperty;
-import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
+import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.VertexManagerPluginContext;
+import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
 import org.apache.tez.dag.api.event.VertexState;
 import org.apache.tez.dag.api.event.VertexStateUpdate;
 import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
@@ -72,7 +72,7 @@ public class PigGraceShuffleVertexManage
             conf = TezUtils.createConfFromUserPayload(getContext().getUserPayload());
             bytesPerTask = conf.getLong(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
                     InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER);
-            pc = (PigContext)ObjectSerializer.deserialize(conf.get(PigImplConstants.PIG_CONTEXT));
+            pc = (PigContext)ObjectSerializer.deserialize(conf.get("pig.pigContext"));
             tezPlan = (TezOperPlan)ObjectSerializer.deserialize(conf.get("pig.tez.plan"));
             TezEstimatedParallelismClearer clearer = new TezEstimatedParallelismClearer(tezPlan);
             try {
@@ -81,10 +81,9 @@ public class PigGraceShuffleVertexManage
                 throw new TezUncheckedException(e);
             }
             TezOperator op = tezPlan.getOperator(OperatorKey.fromString(getContext().getVertexName()));
-
+    
             // Collect grandparents of the vertex
-            Function<TezOperator, String> tezOpToString = new Function<TezOperator, String>() {
-                @Override
+            Function<TezOperator, String> tezOpToString = new Function<TezOperator, String>() { 
                 public String apply(TezOperator op) { return op.getOperatorKey().toString(); }
             };
             grandParents = Lists.transform(TezOperPlan.getGrandParentsForGraceParallelism(tezPlan, op), tezOpToString);
@@ -136,7 +135,7 @@ public class PigGraceShuffleVertexManage
         // Now one of the predecessor is about to start, we need to make a decision now
         if (anyPredAboutToStart) {
             // All grandparents finished, start parents with right parallelism
-
+            
             for (TezOperator pred : preds) {
                 if (pred.getRequestedParallelism()==-1) {
                     List<TezOperator> predPreds = tezPlan.getPredecessors(pred);

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java Fri Feb 24 03:34:37 2017
@@ -25,7 +25,6 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.Properties;
 import java.util.Set;
 
 import org.apache.commons.logging.Log;
@@ -33,7 +32,6 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.TaskAttemptID;
 import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.log4j.PropertyConfigurator;
 import org.apache.pig.JVMReuseImpl;
 import org.apache.pig.PigConstants;
 import org.apache.pig.PigException;
@@ -41,7 +39,6 @@ import org.apache.pig.backend.executione
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigHadoopLogger;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigInputFormat;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.ProgressableReporter;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.UDFFinishVisitor;
@@ -56,7 +53,6 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.data.SchemaTupleBackend;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.PigImplConstants;
 import org.apache.pig.impl.plan.DependencyOrderWalker;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.util.ObjectSerializer;
@@ -136,11 +132,7 @@ public class PigProcessor extends Abstra
         SpillableMemoryManager.getInstance().configure(conf);
         PigContext.setPackageImportList((ArrayList<String>) ObjectSerializer
                 .deserialize(conf.get("udf.import.list")));
-        Properties log4jProperties = (Properties) ObjectSerializer
-                .deserialize(conf.get(PigImplConstants.PIG_LOG4J_PROPERTIES));
-        if (log4jProperties != null) {
-            PropertyConfigurator.configure(log4jProperties);
-        }
+        PigContext pc = (PigContext) ObjectSerializer.deserialize(conf.get("pig.pigContext"));
 
         // To determine front-end in UDFContext
         conf.set(MRConfiguration.JOB_APPLICATION_ATTEMPT_ID, getContext().getUniqueIdentifier());
@@ -159,12 +151,6 @@ public class PigProcessor extends Abstra
         conf.setInt(JobContext.TASK_PARTITION,
               taskAttemptId.getTaskID().getId());
         conf.set(JobContext.ID, taskAttemptId.getJobID().toString());
-        if (conf.get(PigInputFormat.PIG_INPUT_LIMITS) != null) {
-            // Has Load and is a root vertex
-            conf.setInt(JobContext.NUM_MAPS, getContext().getVertexParallelism());
-        } else {
-            conf.setInt(JobContext.NUM_REDUCES, getContext().getVertexParallelism());
-        }
 
         conf.set(PigConstants.TASK_INDEX, Integer.toString(getContext().getTaskIndex()));
         UDFContext.getUDFContext().addJobConf(conf);
@@ -172,7 +158,7 @@ public class PigProcessor extends Abstra
 
         String execPlanString = conf.get(PLAN);
         execPlan = (PhysicalPlan) ObjectSerializer.deserialize(execPlanString);
-        SchemaTupleBackend.initialize(conf);
+        SchemaTupleBackend.initialize(conf, pc);
         PigMapReduce.sJobContext = HadoopShims.createJobContext(conf, new org.apache.hadoop.mapreduce.JobID());
 
         // Set the job conf as a thread-local member of PigMapReduce
@@ -181,7 +167,7 @@ public class PigProcessor extends Abstra
 
         Utils.setDefaultTimeZone(conf);
 
-        boolean aggregateWarning = "true".equalsIgnoreCase(conf.get("aggregate.warning"));
+        boolean aggregateWarning = "true".equalsIgnoreCase(pc.getProperties().getProperty("aggregate.warning"));
         PigStatusReporter pigStatusReporter = PigStatusReporter.getInstance();
         pigStatusReporter.setContext(new TezTaskContext(getContext()));
         pigHadoopLogger = PigHadoopLogger.getInstance();

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/TezInput.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/TezInput.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/TezInput.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/TezInput.java Fri Feb 24 03:34:37 2017
@@ -43,15 +43,6 @@ public interface TezInput {
      */
     public void addInputsToSkip(Set<String> inputsToSkip);
 
-    /**
-     * Attach the inputs to the operator. Also ensure reader.next() is called to force fetch
-     * the input so that all inputs are fetched and memory released before memory is allocated
-     * for outputs
-     *
-     * @param inputs available inputs
-     * @param conf configuration
-     * @throws ExecException
-     */
     public void attachInputs(Map<String, LogicalInput> inputs,
             Configuration conf) throws ExecException;
 

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/WeightedRangePartitionerTez.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/WeightedRangePartitionerTez.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/WeightedRangePartitionerTez.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/WeightedRangePartitionerTez.java Fri Feb 24 03:34:37 2017
@@ -23,7 +23,6 @@ import java.util.Map.Entry;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.DiscreteProbabilitySampleGenerator;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.WeightedRangePartitioner;
 import org.apache.pig.data.DataBag;
@@ -31,7 +30,6 @@ import org.apache.pig.data.InternalMap;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.builtin.FindQuantiles;
 import org.apache.pig.impl.io.PigNullableWritable;
-import org.apache.pig.impl.util.UDFContext;
 import org.apache.tez.runtime.library.common.ConfigUtils;
 
 public class WeightedRangePartitionerTez extends WeightedRangePartitioner {
@@ -66,13 +64,11 @@ public class WeightedRangePartitionerTez
             InternalMap weightedPartsData = (InternalMap) quantileMap.get(FindQuantiles.WEIGHTED_PARTS);
             estimatedNumPartitions = (Integer)quantileMap.get(PigProcessor.ESTIMATED_NUM_PARALLELISM);
             convertToArray(quantilesList);
-            long taskIdHashCode = UDFContext.getUDFContext().getJobConf().get(JobContext.TASK_ID).hashCode();
-            long randomSeed = ((long)taskIdHashCode << 32) | (taskIdHashCode & 0xffffffffL);
             for (Entry<Object, Object> ent : weightedPartsData.entrySet()) {
                 Tuple key = (Tuple) ent.getKey(); // sample item which repeats
                 float[] probVec = getProbVec((Tuple) ent.getValue());
                 weightedParts.put(getPigNullableWritable(key),
-                        new DiscreteProbabilitySampleGenerator(randomSeed, probVec));
+                        new DiscreteProbabilitySampleGenerator(probVec));
             }
         } catch (Exception e) {
             throw new RuntimeException(e);

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java Fri Feb 24 03:34:37 2017
@@ -50,7 +50,6 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.classification.InterfaceAudience;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.Vertex;
-import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
 import org.apache.tez.mapreduce.hadoop.DeprecatedKeys;
 import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
 import org.apache.tez.mapreduce.hadoop.InputSplitInfoDisk;
@@ -103,6 +102,7 @@ public class MRToTezHelper {
         mrReduceParamToTezVertexParamMap.put(MRJobConfig.REDUCE_SPECULATIVE, TezConfiguration.TEZ_AM_SPECULATION_ENABLED);
         mrReduceParamToTezVertexParamMap.put(MRJobConfig.REDUCE_LOG_LEVEL, TezConfiguration.TEZ_TASK_LOG_LEVEL);
         mrReduceParamToTezVertexParamMap.put("mapreduce.job.running.reduce.limit", "tez.am.vertex.max-task-concurrency");
+        mrReduceParamToTezVertexParamMap.put("mapreduce.job.running.map.limit", "tez.am.vertex.max-task-concurrency");
         mrReduceParamToTezVertexParamMap.put(MRJobConfig.TASK_TIMEOUT, "tez.am.progress.stuck.interval-ms");
     }
 
@@ -165,7 +165,11 @@ public class MRToTezHelper {
                     continue;
                 }
             }
-            if (key.startsWith("yarn.nodemanager")) {
+            if (key.startsWith("dfs.datanode")) {
+                tezConf.unset(key);
+            } else if (key.startsWith("dfs.namenode")) {
+                tezConf.unset(key);
+            } else if (key.startsWith("yarn.nodemanager")) {
                 tezConf.unset(key);
             } else if (key.startsWith("mapreduce.jobhistory")) {
                 tezConf.unset(key);
@@ -177,15 +181,20 @@ public class MRToTezHelper {
         }
     }
 
-    public static void translateMRSettingsForTezAM(TezConfiguration dagAMConf) {
+    public static TezConfiguration getDAGAMConfFromMRConf(
+            Configuration tezConf) {
+
+        // Set Tez parameters based on MR parameters.
+        TezConfiguration dagAMConf = new TezConfiguration(tezConf);
+
 
         convertMRToTezConf(dagAMConf, dagAMConf, DeprecatedKeys.getMRToDAGParamMap());
         convertMRToTezConf(dagAMConf, dagAMConf, mrAMParamToTezAMParamMap);
 
-        String env = dagAMConf.get(MRJobConfig.MR_AM_ADMIN_USER_ENV);
-        if (dagAMConf.get(MRJobConfig.MR_AM_ENV) != null) {
-            env = (env == null) ? dagAMConf.get(MRJobConfig.MR_AM_ENV)
-                                : env + "," + dagAMConf.get(MRJobConfig.MR_AM_ENV);
+        String env = tezConf.get(MRJobConfig.MR_AM_ADMIN_USER_ENV);
+        if (tezConf.get(MRJobConfig.MR_AM_ENV) != null) {
+            env = (env == null) ? tezConf.get(MRJobConfig.MR_AM_ENV)
+                                : env + "," + tezConf.get(MRJobConfig.MR_AM_ENV);
         }
 
         if (env != null) {
@@ -194,23 +203,24 @@ public class MRToTezHelper {
 
         dagAMConf.setIfUnset(TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS,
                 org.apache.tez.mapreduce.hadoop.MRHelpers
-                        .getJavaOptsForMRAM(dagAMConf));
+                        .getJavaOptsForMRAM(tezConf));
 
-        String queueName = dagAMConf.get(JobContext.QUEUE_NAME,
+        String queueName = tezConf.get(JobContext.QUEUE_NAME,
                 YarnConfiguration.DEFAULT_QUEUE_NAME);
         dagAMConf.setIfUnset(TezConfiguration.TEZ_QUEUE_NAME, queueName);
 
         dagAMConf.setIfUnset(TezConfiguration.TEZ_AM_VIEW_ACLS,
-                dagAMConf.get(MRJobConfig.JOB_ACL_VIEW_JOB, MRJobConfig.DEFAULT_JOB_ACL_VIEW_JOB));
+                tezConf.get(MRJobConfig.JOB_ACL_VIEW_JOB, MRJobConfig.DEFAULT_JOB_ACL_VIEW_JOB));
 
         dagAMConf.setIfUnset(TezConfiguration.TEZ_AM_MODIFY_ACLS,
-                dagAMConf.get(MRJobConfig.JOB_ACL_MODIFY_JOB, MRJobConfig.DEFAULT_JOB_ACL_MODIFY_JOB));
+                tezConf.get(MRJobConfig.JOB_ACL_MODIFY_JOB, MRJobConfig.DEFAULT_JOB_ACL_MODIFY_JOB));
 
         // Hardcoding at AM level instead of setting per vertex till TEZ-2710 is available
         dagAMConf.setIfUnset(TezConfiguration.TEZ_TASK_SCALE_MEMORY_RESERVE_FRACTION, "0.5");
 
         removeUnwantedSettings(dagAMConf, true);
 
+        return dagAMConf;
     }
 
     /**
@@ -253,14 +263,6 @@ public class MRToTezHelper {
         JobControlCompiler.configureCompression(tezConf);
         convertMRToTezConf(tezConf, mrConf, DeprecatedKeys.getMRToTezRuntimeParamMap());
         removeUnwantedSettings(tezConf, false);
-
-        // ShuffleVertexManager Plugin settings
-        // DeprecatedKeys.getMRToTezRuntimeParamMap() only translates min and not max
-        String slowStartFraction = mrConf.get(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART);
-        if (slowStartFraction != null) {
-            tezConf.setIfUnset(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION, slowStartFraction);
-            tezConf.setIfUnset(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION, slowStartFraction);
-        }
     }
 
     /**

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java Fri Feb 24 03:34:37 2017
@@ -36,14 +36,13 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezEdgeDescriptor;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperator;
-import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POFRJoinTez;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POLocalRearrangeTez;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POStoreTez;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POValueOutputTez;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.ReadScalarsTez;
-import org.apache.pig.backend.hadoop.executionengine.tez.runtime.HashValuePartitioner;
 import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezInput;
 import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezOutput;
+import org.apache.pig.builtin.RoundRobinPartitioner;
 import org.apache.pig.builtin.TOBAG;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.TupleFactory;
@@ -199,8 +198,8 @@ public class TezCompilerUtil {
 
     public static boolean isNonPackageInput(String inputKey, TezOperator tezOp) throws PlanException {
         try {
-            List<POFRJoinTez> inputs = PlanHelper.getPhysicalOperators(tezOp.plan, POFRJoinTez.class);
-            for (POFRJoinTez input : inputs) {
+            List<TezInput> inputs = PlanHelper.getPhysicalOperators(tezOp.plan, TezInput.class);
+            for (TezInput input : inputs) {
                 if (ArrayUtils.contains(input.getTezInputs(), inputKey)) {
                     return true;
                 }
@@ -270,7 +269,7 @@ public class TezCompilerUtil {
         } else if (dataMovementType == DataMovementType.SCATTER_GATHER) {
             edge.outputClassName = UnorderedPartitionedKVOutput.class.getName();
             edge.inputClassName = UnorderedKVInput.class.getName();
-            edge.partitionerClass = HashValuePartitioner.class;
+            edge.partitionerClass = RoundRobinPartitioner.class;
         }
         edge.setIntermediateOutputKeyClass(POValueOutputTez.EmptyWritable.class.getName());
         edge.setIntermediateOutputValueClass(TUPLE_CLASS);

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java Fri Feb 24 03:34:37 2017
@@ -70,7 +70,7 @@ public class MapRedUtil {
     private static Log log = LogFactory.getLog(MapRedUtil.class);
     private static final TupleFactory tf = TupleFactory.getInstance();
 
-    public static final String FILE_SYSTEM_NAME = FileSystem.FS_DEFAULT_NAME_KEY;
+    public static final String FILE_SYSTEM_NAME = "fs.default.name";
 
     /**
      * Loads the key distribution sampler file
@@ -301,7 +301,7 @@ public class MapRedUtil {
     /**
      * Returns the total number of bytes for this file, or if a directory all
      * files in the directory.
-     *
+     * 
      * @param fs FileSystem
      * @param status FileStatus
      * @param max Maximum value of total length that will trigger exit. Many

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java Fri Feb 24 03:34:37 2017
@@ -18,6 +18,7 @@ package org.apache.pig.backend.hadoop.hb
 
 import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
 import java.lang.reflect.UndeclaredThrowableException;
 import java.math.BigDecimal;
 import java.math.BigInteger;
@@ -64,7 +65,6 @@ import org.apache.hadoop.hbase.mapreduce
 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
 import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
 import org.apache.hadoop.hbase.mapreduce.TableSplit;
-import org.apache.hadoop.hbase.security.token.TokenUtil;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapred.JobConf;
@@ -86,6 +86,7 @@ import org.apache.pig.ResourceSchema.Res
 import org.apache.pig.StoreFuncInterface;
 import org.apache.pig.StoreResources;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
+import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
 import org.apache.pig.backend.hadoop.hbase.HBaseTableInputFormat.HBaseTableIFBuilder;
 import org.apache.pig.builtin.FuncUtils;
 import org.apache.pig.builtin.Utf8StorageConverter;
@@ -596,9 +597,7 @@ public class HBaseStorage extends LoadFu
                             new BinaryComparator(colInfo.getColumnName())));
                 }
             }
-            if (columnFilters.getFilters().size() != 0) {
-                thisColumnGroupFilter.addFilter(columnFilters);
-            }
+            thisColumnGroupFilter.addFilter(columnFilters);
             allColumnFilters.addFilter(thisColumnGroupFilter);
         }
         if (allColumnFilters != null) {
@@ -793,35 +792,46 @@ public class HBaseStorage extends LoadFu
     public List<String> getShipFiles() {
         // Depend on HBase to do the right thing when available, as of HBASE-9165
         try {
-            Configuration conf = new Configuration();
-            TableMapReduceUtil.addHBaseDependencyJars(conf);
-            if (conf.get("tmpjars") != null) {
-                String[] tmpjars = conf.getStrings("tmpjars");
-                List<String> shipFiles = new ArrayList<String>(tmpjars.length);
-                for (String tmpjar : tmpjars) {
-                    shipFiles.add(new URL(tmpjar).getPath());
+            Method addHBaseDependencyJars =
+              TableMapReduceUtil.class.getMethod("addHBaseDependencyJars", Configuration.class);
+            if (addHBaseDependencyJars != null) {
+                Configuration conf = new Configuration();
+                addHBaseDependencyJars.invoke(null, conf);
+                if (conf.get("tmpjars") != null) {
+                    String[] tmpjars = conf.getStrings("tmpjars");
+                    List<String> shipFiles = new ArrayList<String>(tmpjars.length);
+                    for (String tmpjar : tmpjars) {
+                        shipFiles.add(new URL(tmpjar).getPath());
+                    }
+                    return shipFiles;
                 }
-                return shipFiles;
-            }
-        } catch (IOException e) {
-            if(e instanceof MalformedURLException){
-                LOG.debug("TableMapReduceUtils#addHBaseDependencyJars tmpjars"
-                        + " had malformed url. Falling back to previous logic.", e);
-            }else {
-                LOG.debug("TableMapReduceUtils#addHBaseDependencyJars invocation"
-                        + " failed. Falling back to previous logic.", e);
             }
+        } catch (NoSuchMethodException e) {
+            LOG.debug("TableMapReduceUtils#addHBaseDependencyJars not available."
+              + " Falling back to previous logic.", e);
+        } catch (IllegalAccessException e) {
+            LOG.debug("TableMapReduceUtils#addHBaseDependencyJars invocation"
+              + " not permitted. Falling back to previous logic.", e);
+        } catch (InvocationTargetException e) {
+            LOG.debug("TableMapReduceUtils#addHBaseDependencyJars invocation"
+              + " failed. Falling back to previous logic.", e);
+        } catch (MalformedURLException e) {
+            LOG.debug("TableMapReduceUtils#addHBaseDependencyJars tmpjars"
+                    + " had malformed url. Falling back to previous logic.", e);
         }
 
         List<Class> classList = new ArrayList<Class>();
         classList.add(org.apache.hadoop.hbase.client.HTable.class); // main hbase jar or hbase-client
         classList.add(org.apache.hadoop.hbase.mapreduce.TableSplit.class); // main hbase jar or hbase-server
+        if (!HadoopShims.isHadoopYARN()) { //Avoid shipping duplicate. Hadoop 0.23/2 itself has guava
+            classList.add(com.google.common.collect.Lists.class); // guava
+        }
         classList.add(org.apache.zookeeper.ZooKeeper.class); // zookeeper
         // Additional jars that are specific to v0.95.0+
         addClassToList("org.cloudera.htrace.Trace", classList); // htrace
         addClassToList("org.apache.hadoop.hbase.protobuf.generated.HBaseProtos", classList); // hbase-protocol
         addClassToList("org.apache.hadoop.hbase.TableName", classList); // hbase-common
-        addClassToList("org.apache.hadoop.hbase.CompatibilityFactory", classList); // hbase-hadoop-compat
+        addClassToList("org.apache.hadoop.hbase.CompatibilityFactory", classList); // hbase-hadoop-compar
         addClassToList("org.jboss.netty.channel.ChannelFactory", classList); // netty
         return FuncUtils.getShipFiles(classList);
     }
@@ -872,13 +882,27 @@ public class HBaseStorage extends LoadFu
         }
 
         if ("kerberos".equalsIgnoreCase(hbaseConf.get(HBASE_SECURITY_CONF_KEY))) {
+            // Will not be entering this block for 0.20.2 as it has no security.
             try {
-                UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
-                if (currentUser.hasKerberosCredentials()) {
-                    TokenUtil.obtainTokenForJob(hbaseConf,currentUser,job);
+                // getCurrentUser method is not public in 0.20.2
+                Method m1 = UserGroupInformation.class.getMethod("getCurrentUser");
+                UserGroupInformation currentUser = (UserGroupInformation) m1.invoke(null,(Object[]) null);
+                // hasKerberosCredentials method not available in 0.20.2
+                Method m2 = UserGroupInformation.class.getMethod("hasKerberosCredentials");
+                boolean hasKerberosCredentials = (Boolean) m2.invoke(currentUser, (Object[]) null);
+                if (hasKerberosCredentials) {
+                    // Class and method are available only from 0.92 security release
+                    Class tokenUtilClass = Class
+                            .forName("org.apache.hadoop.hbase.security.token.TokenUtil");
+                    Method m3 = tokenUtilClass.getMethod("obtainTokenForJob", new Class[] {
+                            Configuration.class, UserGroupInformation.class, Job.class });
+                    m3.invoke(null, new Object[] { hbaseConf, currentUser, job });
                 } else {
                     LOG.info("Not fetching hbase delegation token as no Kerberos TGT is available");
                 }
+            } catch (ClassNotFoundException cnfe) {
+                throw new RuntimeException("Failure loading TokenUtil class, "
+                        + "is secure RPC available?", cnfe);
             } catch (RuntimeException re) {
                 throw re;
             } catch (Exception e) {

Modified: pig/branches/spark/src/org/apache/pig/builtin/Bloom.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/Bloom.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/builtin/Bloom.java (original)
+++ pig/branches/spark/src/org/apache/pig/builtin/Bloom.java Fri Feb 24 03:34:37 2017
@@ -35,7 +35,6 @@ import org.apache.pig.FilterFunc;
 import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
 
 /**
  * Use a Bloom filter build previously by BuildBloom.  You would first
@@ -55,36 +54,14 @@ import org.apache.pig.data.TupleFactory;
  * C = filter B by bloom(z);
  * D = join C by z, A by x;
  * It uses {@link org.apache.hadoop.util.bloom.BloomFilter}.
- *
- * You can also pass the Bloom filter from BuildBloom directly to Bloom UDF
- * as a scalar instead of storing it to file and loading again. This is simpler
- * if the Bloom filter will not be reused and needs to be discarded after the
- * run of the script.
- *
- * define bb BuildBloom('jenkins', '100', '0.1');
- * A = load 'foo' as (x, y);
- * B = group A all;
- * C = foreach B generate bb(A.x) as bloomfilter;
- * D = load 'bar' as (z);
- * E = filter D by Bloom(C.bloomfilter, z);
- * F = join E by z, A by x;
  */
 public class Bloom extends FilterFunc {
 
-    private static TupleFactory mTupleFactory = TupleFactory.getInstance();
-
     private String bloomFile;
-    private BloomFilter filter = null;
+    public BloomFilter filter = null;
 
-    public Bloom() {
-    }
-
-    /**
-     * The filename containing the serialized Bloom filter. If filename is null
-     * or the no-arg constructor is used, then the bloomfilter bytearray which
-     * is the output of BuildBloom should be passed as the first argument to the UDF
-     *
-     * @param filename  file containing the serialized Bloom filter
+    /** 
+     * @param filename file containing the serialized Bloom filter
      */
     public Bloom(String filename) {
         bloomFile = filename;
@@ -93,25 +70,11 @@ public class Bloom extends FilterFunc {
     @Override
     public Boolean exec(Tuple input) throws IOException {
         if (filter == null) {
-            init(input);
+            init();
         }
         byte[] b;
-        if (bloomFile == null) {
-            // The first one is the bloom filter. Skip that
-            if (input.size() == 2) {
-                b = DataType.toBytes(input.get(1));
-            } else {
-                List<Object> inputList = input.getAll();
-                Tuple tuple = mTupleFactory.newTupleNoCopy(inputList.subList(1, inputList.size()));
-                b = DataType.toBytes(tuple, DataType.TUPLE);
-            }
-        } else {
-            if (input.size() == 1) {
-                b = DataType.toBytes(input.get(0));
-            } else {
-                b = DataType.toBytes(input, DataType.TUPLE);
-            }
-        }
+        if (input.size() == 1) b = DataType.toBytes(input.get(0));
+        else b = DataType.toBytes(input, DataType.TUPLE);
 
         Key k = new Key(b);
         return filter.membershipTest(k);
@@ -119,46 +82,34 @@ public class Bloom extends FilterFunc {
 
     @Override
     public List<String> getCacheFiles() {
-        if (bloomFile != null) {
-            List<String> list = new ArrayList<String>(1);
-            // We were passed the name of the file on HDFS.  Append a
-            // name for the file on the task node.
-            try {
-                list.add(bloomFile + "#" + getFilenameFromPath(bloomFile));
-            } catch (IOException e) {
-                throw new RuntimeException(e);
-            }
-            return list;
+        List<String> list = new ArrayList<String>(1);
+        // We were passed the name of the file on HDFS.  Append a
+        // name for the file on the task node.
+        try {
+            list.add(bloomFile + "#" + getFilenameFromPath(bloomFile));
+        } catch (IOException e) {
+            throw new RuntimeException(e);
         }
-        return null;
+        return list;
     }
 
-    private void init(Tuple input) throws IOException {
-        if (bloomFile == null) {
-            if (input.get(0) instanceof DataByteArray) {
-                filter = BuildBloomBase.bloomIn((DataByteArray) input.get(0));
-            } else {
-                throw new IllegalArgumentException("The first argument to the Bloom UDF should be"
-                        + " the bloom filter if a bloom file is not specified in the constructor");
-            }
-        } else {
-            filter = new BloomFilter();
-            String dir = "./" + getFilenameFromPath(bloomFile);
-            String[] partFiles = new File(dir)
-                    .list(new FilenameFilter() {
-                        @Override
-                        public boolean accept(File current, String name) {
-                            return name.startsWith("part");
-                        }
-                    });
-
-            String dcFile = dir + "/" + partFiles[0];
-            DataInputStream dis = new DataInputStream(new FileInputStream(dcFile));
-            try {
-                filter.readFields(dis);
-            } finally {
-                dis.close();
-            }
+    private void init() throws IOException {
+        filter = new BloomFilter();
+        String dir = "./" + getFilenameFromPath(bloomFile);
+        String[] partFiles = new File(dir)
+                .list(new FilenameFilter() {
+                    @Override
+                    public boolean accept(File current, String name) {
+                        return name.startsWith("part");
+                    }
+                });
+
+        String dcFile = dir + "/" + partFiles[0];
+        DataInputStream dis = new DataInputStream(new FileInputStream(dcFile));
+        try {
+            filter.readFields(dis);
+        } finally {
+            dis.close();
         }
     }
 

Modified: pig/branches/spark/src/org/apache/pig/builtin/BuildBloomBase.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/BuildBloomBase.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/builtin/BuildBloomBase.java (original)
+++ pig/branches/spark/src/org/apache/pig/builtin/BuildBloomBase.java Fri Feb 24 03:34:37 2017
@@ -18,15 +18,16 @@
 
 package org.apache.pig.builtin;
 
+import java.io.IOException;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
-import java.io.IOException;
 import java.util.Iterator;
 
 import org.apache.hadoop.util.bloom.BloomFilter;
 import org.apache.hadoop.util.hash.Hash;
+
 import org.apache.pig.EvalFunc;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.data.DataBag;
@@ -46,7 +47,7 @@ public abstract class BuildBloomBase<T>
     protected BuildBloomBase() {
     }
 
-    /**
+    /** 
      * @param hashType type of the hashing function (see
      * {@link org.apache.hadoop.util.hash.Hash}).
      * @param mode Will be ignored, though by convention it should be
@@ -63,7 +64,7 @@ public abstract class BuildBloomBase<T>
         hType = convertHashType(hashType);
     }
 
-    /**
+    /** 
      * @param hashType type of the hashing function (see
      * {@link org.apache.hadoop.util.hash.Hash}).
      * @param numElements The number of distinct elements expected to be
@@ -103,7 +104,7 @@ public abstract class BuildBloomBase<T>
         return new DataByteArray(baos.toByteArray());
     }
 
-    public static BloomFilter bloomIn(DataByteArray b) throws IOException {
+    protected BloomFilter bloomIn(DataByteArray b) throws IOException {
         DataInputStream dis = new DataInputStream(new
             ByteArrayInputStream(b.get()));
         BloomFilter f = new BloomFilter();

Modified: pig/branches/spark/src/org/apache/pig/builtin/HiveUDFBase.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/HiveUDFBase.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/builtin/HiveUDFBase.java (original)
+++ pig/branches/spark/src/org/apache/pig/builtin/HiveUDFBase.java Fri Feb 24 03:34:37 2017
@@ -37,7 +37,6 @@ import org.apache.hadoop.hive.serde2.obj
 import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-import org.apache.hadoop.hive.shims.Hadoop23Shims;
 import org.apache.hadoop.hive.shims.HadoopShimsSecure;
 import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.mapred.Counters;
@@ -181,9 +180,20 @@ abstract class HiveUDFBase extends EvalF
 
     @Override
     public List<String> getShipFiles() {
+        String hadoopVersion = "20S";
+        if (Utils.isHadoop23() || Utils.isHadoop2()) {
+            hadoopVersion = "23";
+        }
+        Class hadoopVersionShimsClass;
+        try {
+            hadoopVersionShimsClass = Class.forName("org.apache.hadoop.hive.shims.Hadoop" +
+                    hadoopVersion + "Shims");
+        } catch (ClassNotFoundException e) {
+            throw new RuntimeException("Cannot find Hadoop" + hadoopVersion + "ShimsClass in classpath");
+        }
         List<String> files = FuncUtils.getShipFiles(new Class[] {GenericUDF.class,
-                PrimitiveObjectInspector.class, HiveConf.class, Serializer.class, ShimLoader.class,
-                Hadoop23Shims.class, HadoopShimsSecure.class, Collector.class});
+                PrimitiveObjectInspector.class, HiveConf.class, Serializer.class, ShimLoader.class, 
+                hadoopVersionShimsClass, HadoopShimsSecure.class, Collector.class});
         return files;
     }
 

Modified: pig/branches/spark/src/org/apache/pig/builtin/OrcStorage.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/OrcStorage.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/builtin/OrcStorage.java (original)
+++ pig/branches/spark/src/org/apache/pig/builtin/OrcStorage.java Fri Feb 24 03:34:37 2017
@@ -56,7 +56,6 @@ import org.apache.hadoop.hive.serde2.obj
 import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
-import org.apache.hadoop.hive.shims.Hadoop23Shims;
 import org.apache.hadoop.hive.shims.HadoopShimsSecure;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.Job;
@@ -390,8 +389,20 @@ public class OrcStorage extends LoadFunc
 
     @Override
     public List<String> getShipFiles() {
+        List<String> cacheFiles = new ArrayList<String>();
+        String hadoopVersion = "20S";
+        if (Utils.isHadoop23() || Utils.isHadoop2()) {
+            hadoopVersion = "23";
+        }
+        Class hadoopVersionShimsClass;
+        try {
+            hadoopVersionShimsClass = Class.forName("org.apache.hadoop.hive.shims.Hadoop" +
+                    hadoopVersion + "Shims");
+        } catch (ClassNotFoundException e) {
+            throw new RuntimeException("Cannot find Hadoop" + hadoopVersion + "ShimsClass in classpath");
+        }
         Class[] classList = new Class[] {OrcFile.class, HiveConf.class, AbstractSerDe.class,
-                org.apache.hadoop.hive.shims.HadoopShims.class, HadoopShimsSecure.class, Hadoop23Shims.class,
+                org.apache.hadoop.hive.shims.HadoopShims.class, HadoopShimsSecure.class, hadoopVersionShimsClass,
                 Input.class};
         return FuncUtils.getShipFiles(classList);
     }
@@ -445,7 +456,7 @@ public class OrcStorage extends LoadFunc
     }
 
     private TypeInfo getTypeInfoFromLocation(String location, Job job) throws IOException {
-        FileSystem fs = FileSystem.get(new Path(location).toUri(), job.getConfiguration());
+        FileSystem fs = FileSystem.get(job.getConfiguration());
         Path path = getFirstFile(location, fs, new NonEmptyOrcFileFilter(fs));
         if (path == null) {
             log.info("Cannot find any ORC files from " + location +

Modified: pig/branches/spark/src/org/apache/pig/builtin/PigStorage.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/PigStorage.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/builtin/PigStorage.java (original)
+++ pig/branches/spark/src/org/apache/pig/builtin/PigStorage.java Fri Feb 24 03:34:37 2017
@@ -68,6 +68,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextInputFormat;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextOutputFormat;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
 import org.apache.pig.bzip2r.Bzip2TextInputFormat;
 import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.Tuple;
@@ -170,7 +171,7 @@ LoadPushDown, LoadMetadata, StoreMetadat
         validOptions.addOption(TAG_SOURCE_FILE, false, "Appends input source file name to beginning of each tuple.");
         validOptions.addOption(TAG_SOURCE_PATH, false, "Appends input source file path to beginning of each tuple.");
         validOptions.addOption("tagsource", false, "Appends input source file name to beginning of each tuple.");
-        Option overwrite = new Option("overwrite", "Overwrites the destination.");
+        Option overwrite = new Option(" ", "Overwrites the destination.");
         overwrite.setLongOpt("overwrite");
         overwrite.setOptionalArg(true);
         overwrite.setArgs(1);
@@ -411,7 +412,7 @@ LoadPushDown, LoadMetadata, StoreMetadat
     @Override
     public InputFormat getInputFormat() {
         if((loadLocation.endsWith(".bz2") || loadLocation.endsWith(".bz"))
-           && (!bzipinput_usehadoops) ) {
+           && (!bzipinput_usehadoops || !HadoopShims.isHadoopYARN()) ) {
             mLog.info("Using Bzip2TextInputFormat");
             return new Bzip2TextInputFormat();
         } else {

Modified: pig/branches/spark/src/org/apache/pig/builtin/RoundRobinPartitioner.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/RoundRobinPartitioner.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/builtin/RoundRobinPartitioner.java (original)
+++ pig/branches/spark/src/org/apache/pig/builtin/RoundRobinPartitioner.java Fri Feb 24 03:34:37 2017
@@ -17,63 +17,15 @@
  */
 package org.apache.pig.builtin;
 
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.Partitioner;
 
-/**
- * This partitioner should be used with extreme caution and only in cases
- * where the order of output records is guaranteed to be same. If the order of
- * output records can vary on retries which is mostly the case, map reruns
- * due to shuffle fetch failures can lead to data being partitioned differently
- * and result in incorrect output due to loss or duplication of data.
- * Refer PIG-5041 for more details.
- *
- * This will be removed in the next release as it is risky to use in most cases.
- */
-@Deprecated
-public class RoundRobinPartitioner extends Partitioner<Writable, Writable>
-        implements Configurable {
-
-    /**
-     * Batch size for round robin partitioning. Batch size number of records
-     * will be distributed to each partition in a round robin fashion. Default
-     * value is 0 which distributes each record in a circular fashion. Higher
-     * number for batch size can be used to increase probability of keeping
-     * similar records in the same partition if output is already sorted and get
-     * better compression.
-     */
-    public static String PIG_ROUND_ROBIN_PARTITIONER_BATCH_SIZE = "pig.round.robin.partitioner.batch.size";
-    private int num = -1;
-    private int batchSize = 0;
-    private int currentBatchCount = 0;
-    private Configuration conf;
+public class RoundRobinPartitioner extends Partitioner<Writable, Writable> {
+    private int num = 0;
 
     @Override
     public int getPartition(Writable key, Writable value, int numPartitions) {
-        if (batchSize > 0) {
-            if (currentBatchCount == 0) {
-                num = ++num % numPartitions;
-            }
-            if (++currentBatchCount == batchSize) {
-                currentBatchCount = 0;
-            }
-        } else {
-            num = ++num % numPartitions;
-        }
+        num = ++num % numPartitions;
         return num;
     }
-
-    @Override
-    public void setConf(Configuration conf) {
-        this.conf = conf;
-        batchSize = conf.getInt(PIG_ROUND_ROBIN_PARTITIONER_BATCH_SIZE, 0);
-    }
-
-    @Override
-    public Configuration getConf() {
-        return conf;
-    }
-
 }

Modified: pig/branches/spark/src/org/apache/pig/builtin/TextLoader.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/TextLoader.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/builtin/TextLoader.java (original)
+++ pig/branches/spark/src/org/apache/pig/builtin/TextLoader.java Fri Feb 24 03:34:37 2017
@@ -37,6 +37,7 @@ import org.apache.pig.ResourceSchema.Res
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextInputFormat;
+import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
 import org.apache.pig.bzip2r.Bzip2TextInputFormat;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataByteArray;
@@ -258,7 +259,8 @@ public class TextLoader extends LoadFunc
     @Override
     public InputFormat getInputFormat() {
         if((loadLocation.endsWith(".bz2") || loadLocation.endsWith(".bz"))
-                && !bzipinput_usehadoops ) {
+           && !HadoopShims.isHadoopYARN()
+           && !bzipinput_usehadoops ) {
             mLog.info("Using Bzip2TextInputFormat");
             return new Bzip2TextInputFormat();
         } else {

Modified: pig/branches/spark/src/org/apache/pig/data/DefaultAbstractBag.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/data/DefaultAbstractBag.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/data/DefaultAbstractBag.java (original)
+++ pig/branches/spark/src/org/apache/pig/data/DefaultAbstractBag.java Fri Feb 24 03:34:37 2017
@@ -423,7 +423,7 @@ public abstract class DefaultAbstractBag
     }
 
     @SuppressWarnings("rawtypes")
-    protected void warn(String msg, Enum warningEnum, Throwable e) {
+    protected void warn(String msg, Enum warningEnum, Exception e) {
         pigLogger = PhysicalOperator.getPigLogger();
         if(pigLogger != null) {
             pigLogger.warn(this, msg, warningEnum);

Modified: pig/branches/spark/src/org/apache/pig/data/DefaultDataBag.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/data/DefaultDataBag.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/data/DefaultDataBag.java (original)
+++ pig/branches/spark/src/org/apache/pig/data/DefaultDataBag.java Fri Feb 24 03:34:37 2017
@@ -22,11 +22,11 @@ import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.EOFException;
 import java.io.FileInputStream;
-import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+import java.io.FileNotFoundException;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -42,12 +42,12 @@ import org.apache.pig.PigWarning;
 public class DefaultDataBag extends DefaultAbstractBag {
 
     /**
-     *
+     * 
      */
     private static final long serialVersionUID = 2L;
 
     private static final Log log = LogFactory.getLog(DefaultDataBag.class);
-
+    
     private static final InterSedes SEDES = InterSedesFactory.getInterSedesInstance();
 
     public DefaultDataBag() {
@@ -70,12 +70,12 @@ public class DefaultDataBag extends Defa
     public boolean isSorted() {
         return false;
     }
-
+    
     @Override
     public boolean isDistinct() {
         return false;
     }
-
+    
     @Override
     public Iterator<Tuple> iterator() {
         return new DefaultDataBagIterator();
@@ -110,15 +110,12 @@ public class DefaultDataBag extends Defa
                     if ((spilled & 0x3fff) == 0) reportProgress();
                 }
                 out.flush();
-                out.close();
-                out = null;
-                mContents.clear();
-            } catch (Throwable e) {
+            } catch (IOException ioe) {
                 // Remove the last file from the spilled array, since we failed to
                 // write to it.
                 mSpillFiles.remove(mSpillFiles.size() - 1);
                 warn(
-                    "Unable to spill contents to disk", PigWarning.UNABLE_TO_SPILL, e);
+                    "Unable to spill contents to disk", PigWarning.UNABLE_TO_SPILL, ioe);
                 return 0;
             } finally {
                 if (out != null) {
@@ -129,6 +126,7 @@ public class DefaultDataBag extends Defa
                     }
                 }
             }
+            mContents.clear();
         }
         // Increment the spill count
         incSpillCount(PigCounters.SPILLABLE_MEMORY_MANAGER_SPILL_COUNT);
@@ -158,7 +156,7 @@ public class DefaultDataBag extends Defa
         }
 
         @Override
-        public boolean hasNext() {
+        public boolean hasNext() { 
             // Once we call hasNext(), set the flag, so we can call hasNext() repeated without fetching next tuple
             if (hasCachedTuple)
                 return (mBuf != null);
@@ -211,7 +209,7 @@ public class DefaultDataBag extends Defa
                 } catch (FileNotFoundException fnfe) {
                     // We can't find our own spill file?  That should never
                     // happen.
-                    String msg = "Unable to find our spill file.";
+                    String msg = "Unable to find our spill file."; 
                     log.fatal(msg, fnfe);
                     throw new RuntimeException(msg, fnfe);
                 }
@@ -225,7 +223,7 @@ public class DefaultDataBag extends Defa
                         log.fatal(msg, eof);
                         throw new RuntimeException(msg, eof);
                     } catch (IOException ioe) {
-                        String msg = "Unable to read our spill file.";
+                        String msg = "Unable to read our spill file."; 
                         log.fatal(msg, ioe);
                         throw new RuntimeException(msg, ioe);
                     }
@@ -261,7 +259,7 @@ public class DefaultDataBag extends Defa
                         log.warn("Failed to close spill file.", e);
                     }
                 } catch (IOException ioe) {
-                    String msg = "Unable to read our spill file.";
+                    String msg = "Unable to read our spill file."; 
                     log.fatal(msg, ioe);
                     throw new RuntimeException(msg, ioe);
                 }

Modified: pig/branches/spark/src/org/apache/pig/data/DistinctDataBag.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/data/DistinctDataBag.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/data/DistinctDataBag.java (original)
+++ pig/branches/spark/src/org/apache/pig/data/DistinctDataBag.java Fri Feb 24 03:34:37 2017
@@ -67,17 +67,17 @@ public class DistinctDataBag extends Def
     public boolean isSorted() {
         return false;
     }
-
+    
     @Override
     public boolean isDistinct() {
         return true;
     }
-
-
+    
+    
     @Override
     public long size() {
         if (mSpillFiles != null && mSpillFiles.size() > 0){
-            //We need to racalculate size to guarantee a count of unique
+            //We need to racalculate size to guarantee a count of unique 
             //entries including those on disk
             Iterator<Tuple> iter = iterator();
             int newSize = 0;
@@ -85,7 +85,7 @@ public class DistinctDataBag extends Def
                 newSize++;
                 iter.next();
             }
-
+            
             synchronized(mContents) {
                 //we don't want adds to change our numbers
                 //the lock may need to cover more of the method
@@ -94,8 +94,8 @@ public class DistinctDataBag extends Def
         }
         return mSize;
     }
-
-
+    
+    
     @Override
     public Iterator<Tuple> iterator() {
         return new DistinctDataBagIterator();
@@ -155,15 +155,12 @@ public class DistinctDataBag extends Def
                     }
                 }
                 out.flush();
-                out.close();
-                out = null;
-                mContents.clear();
-            } catch (Throwable e) {
+            } catch (IOException ioe) {
                 // Remove the last file from the spilled array, since we failed to
                 // write to it.
                 mSpillFiles.remove(mSpillFiles.size() - 1);
                 warn(
-                    "Unable to spill contents to disk", PigWarning.UNABLE_TO_SPILL, e);
+                    "Unable to spill contents to disk", PigWarning.UNABLE_TO_SPILL, ioe);
                 return 0;
             } finally {
                 if (out != null) {
@@ -174,6 +171,7 @@ public class DistinctDataBag extends Def
                     }
                 }
             }
+            mContents.clear();
         }
         // Increment the spill count
         incSpillCount(PigCounters.SPILLABLE_MEMORY_MANAGER_SPILL_COUNT);
@@ -210,7 +208,7 @@ public class DistinctDataBag extends Def
 
             @Override
             public int hashCode() {
-                return tuple.hashCode();
+                return tuple.hashCode(); 
             }
         }
 
@@ -239,7 +237,7 @@ public class DistinctDataBag extends Def
         }
 
         @Override
-        public boolean hasNext() {
+        public boolean hasNext() { 
             // See if we can find a tuple.  If so, buffer it.
             mBuf = next();
             return mBuf != null;
@@ -297,7 +295,7 @@ public class DistinctDataBag extends Def
                 } catch (FileNotFoundException fnfe) {
                     // We can't find our own spill file?  That should never
                     // happen.
-                    String msg = "Unable to find our spill file.";
+                    String msg = "Unable to find our spill file."; 
                     log.fatal(msg, fnfe);
                     throw new RuntimeException(msg, fnfe);
                 }
@@ -348,7 +346,7 @@ public class DistinctDataBag extends Def
                 Iterator<File> i = mSpillFiles.iterator();
                 while (i.hasNext()) {
                     try {
-                        DataInputStream in =
+                        DataInputStream in = 
                             new DataInputStream(new BufferedInputStream(
                                 new FileInputStream(i.next())));
                         mStreams.add(in);
@@ -504,7 +502,7 @@ public class DistinctDataBag extends Def
                             addToQueue(null, mStreams.size() - 1);
                             i.remove();
                             filesToDelete.add(f);
-
+                            
                         } catch (FileNotFoundException fnfe) {
                             // We can't find our own spill file?  That should
                             // neer happen.
@@ -547,7 +545,7 @@ public class DistinctDataBag extends Def
                         log.warn("Failed to delete spill file: " + f.getPath());
                     }
                 }
-
+                
                 // clear the list, so that finalize does not delete any files,
                 // when mSpillFiles is assigned a new value
                 mSpillFiles.clear();
@@ -562,6 +560,6 @@ public class DistinctDataBag extends Def
             }
         }
     }
-
+    
 }
 

Modified: pig/branches/spark/src/org/apache/pig/data/ReadOnceBag.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/data/ReadOnceBag.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/data/ReadOnceBag.java (original)
+++ pig/branches/spark/src/org/apache/pig/data/ReadOnceBag.java Fri Feb 24 03:34:37 2017
@@ -50,9 +50,6 @@ public class ReadOnceBag implements Data
      */
     private static final long serialVersionUID = 2L;
 
-    public ReadOnceBag() {
-    }
-
     /**
      * This constructor creates a bag out of an existing iterator
      * of tuples by taking ownership of the iterator and NOT

Modified: pig/branches/spark/src/org/apache/pig/data/SchemaTupleBackend.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/data/SchemaTupleBackend.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/data/SchemaTupleBackend.java (original)
+++ pig/branches/spark/src/org/apache/pig/data/SchemaTupleBackend.java Fri Feb 24 03:34:37 2017
@@ -39,7 +39,6 @@ import org.apache.pig.data.utils.Structu
 import org.apache.pig.data.utils.StructuresHelper.Triple;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.impl.util.Utils;
 
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
@@ -273,20 +272,14 @@ public class SchemaTupleBackend {
     private static SchemaTupleBackend stb;
 
     public static void initialize(Configuration jConf, PigContext pigContext) throws IOException {
-        if (stb != null) {
-            SchemaTupleFrontend.lazyReset(pigContext);
-        }
-        initialize(jConf, pigContext.getExecType().isLocal());
+        initialize(jConf, pigContext, pigContext.getExecType().isLocal());
     }
 
-    public static void initialize(Configuration jConf) throws IOException {
-        initialize(jConf, Utils.isLocal(jConf));
-    }
-
-    public static void initialize(Configuration jConf, boolean isLocal) throws IOException {
+    public static void initialize(Configuration jConf, PigContext pigContext, boolean isLocal) throws IOException {
         if (stb != null) {
             LOG.warn("SchemaTupleBackend has already been initialized");
         } else {
+            SchemaTupleFrontend.lazyReset(pigContext);
             SchemaTupleFrontend.reset();
             SchemaTupleBackend stbInstance = new SchemaTupleBackend(jConf, isLocal);
             stbInstance.copyAndResolve();

Modified: pig/branches/spark/src/org/apache/pig/data/SortedDataBag.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/data/SortedDataBag.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/data/SortedDataBag.java (original)
+++ pig/branches/spark/src/org/apache/pig/data/SortedDataBag.java Fri Feb 24 03:34:37 2017
@@ -32,7 +32,7 @@ import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.ListIterator;
 import java.util.PriorityQueue;
-
+  
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.pig.PigCounters;
@@ -44,14 +44,14 @@ import org.apache.pig.PigWarning;
  * stored unsorted as it comes in, and only sorted when it is time to dump
  * it to a file or when the first iterator is requested.  Experementation
  * found this to be the faster than storing it sorted to begin with.
- *
+ * 
  * We allow a user defined comparator, but provide a default comparator in
  * cases where the user doesn't specify one.
  */
 public class SortedDataBag extends DefaultAbstractBag{
 
     /**
-     *
+     * 
      */
     private static final long serialVersionUID = 2L;
 
@@ -76,7 +76,7 @@ public class SortedDataBag extends Defau
 
         @Override
         public int hashCode() {
-            return 42;
+            return 42; 
         }
 
     }
@@ -95,12 +95,12 @@ public class SortedDataBag extends Defau
     public boolean isSorted() {
         return true;
     }
-
+    
     @Override
     public boolean isDistinct() {
         return false;
     }
-
+    
     @Override
     public Iterator<Tuple> iterator() {
         return new SortedDataBagIterator();
@@ -145,15 +145,12 @@ public class SortedDataBag extends Defau
                     if ((spilled & 0x3fff) == 0) reportProgress();
                 }
                 out.flush();
-                out.close();
-                out = null;
-                mContents.clear();
-            } catch (Throwable e) {
+            } catch (IOException ioe) {
                 // Remove the last file from the spilled array, since we failed to
                 // write to it.
                 mSpillFiles.remove(mSpillFiles.size() - 1);
                 warn(
-                    "Unable to spill contents to disk", PigWarning.UNABLE_TO_SPILL, e);
+                    "Unable to spill contents to disk", PigWarning.UNABLE_TO_SPILL, ioe);
                 return 0;
             } finally {
                 if (out != null) {
@@ -164,6 +161,7 @@ public class SortedDataBag extends Defau
                     }
                 }
             }
+            mContents.clear();
         }
         // Increment the spill count
         incSpillCount(PigCounters.SPILLABLE_MEMORY_MANAGER_SPILL_COUNT);
@@ -205,7 +203,7 @@ public class SortedDataBag extends Defau
 
             @Override
             public int hashCode() {
-                return tuple.hashCode();
+                return tuple.hashCode(); 
             }
         }
 
@@ -230,7 +228,7 @@ public class SortedDataBag extends Defau
         }
 
         @Override
-        public boolean hasNext() {
+        public boolean hasNext() { 
             // See if we can find a tuple.  If so, buffer it.
             mBuf = next();
             return mBuf != null;
@@ -343,7 +341,7 @@ public class SortedDataBag extends Defau
                 Iterator<File> i = mSpillFiles.iterator();
                 while (i.hasNext()) {
                     try {
-                        DataInputStream in =
+                        DataInputStream in = 
                             new DataInputStream(new BufferedInputStream(
                                 new FileInputStream(i.next())));
                         mStreams.add(in);
@@ -353,7 +351,7 @@ public class SortedDataBag extends Defau
                     } catch (FileNotFoundException fnfe) {
                         // We can't find our own spill file?  That should
                         // never happen.
-                        String msg = "Unable to find our spill file.";
+                        String msg = "Unable to find our spill file."; 
                         log.fatal(msg, fnfe);
                         throw new RuntimeException(msg, fnfe);
                     }
@@ -413,7 +411,7 @@ public class SortedDataBag extends Defau
                         in.close();
                     }catch(IOException e) {
                         log.warn("Failed to close spill file.", e);
-                    }
+                    }                	
                     mStreams.set(fileNum, null);
                 } catch (IOException ioe) {
                     String msg = "Unable to find our spill file.";
@@ -520,7 +518,7 @@ public class SortedDataBag extends Defau
                         log.warn("Failed to delete spill file: " + f.getPath());
                     }
                 }
-
+                
                 // clear the list, so that finalize does not delete any files,
                 // when mSpillFiles is assigned a new value
                 mSpillFiles.clear();