You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by zl...@apache.org on 2017/04/12 02:20:22 UTC

svn commit: r1791060 [4/4] - in /pig/branches/spark: ./ bin/ src/docs/src/documentation/content/xdocs/ src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer...

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java Wed Apr 12 02:20:20 2017
@@ -42,28 +42,28 @@ import org.apache.pig.tools.pigstats.Pig
 
 public class PigInputFormatSpark extends PigInputFormat {
 
-	@Override
-	public RecordReader<Text, Tuple> createRecordReader(InputSplit split,
-			TaskAttemptContext context) throws IOException,
-			InterruptedException {
-        initLogger();
-        resetUDFContext();
-        //PigSplit#conf is the default hadoop configuration, we need get the configuration
-        //from context.getConfigration() to retrieve pig properties
-        PigSplit pigSplit = (PigSplit) split;
-        Configuration conf = context.getConfiguration();
-        pigSplit.setConf(conf);
-        //Set current splitIndex in PigMapReduce.sJobContext.getConfiguration.get(PigImplConstants.PIG_SPLIT_INDEX)
-        //which will be used in POMergeCogroup#setup
-        if (PigMapReduce.sJobContext == null) {
-            PigMapReduce.sJobContext = HadoopShims.createJobContext(conf, new JobID());
-        }
-        PigMapReduce.sJobContext.getConfiguration().setInt(PigImplConstants.PIG_SPLIT_INDEX, pigSplit.getSplitIndex());
-        // Here JobConf is first available in spark Executor thread, we initialize PigContext,UDFContext and
-        // SchemaTupleBackend by reading properties from JobConf
-        initialize(conf);
-        return super.createRecordReader(split, context);
+@Override
+public RecordReader<Text, Tuple> createRecordReader(InputSplit split,
+                                                    TaskAttemptContext context) throws IOException,
+        InterruptedException {
+    initLogger();
+    resetUDFContext();
+    //PigSplit#conf is the default hadoop configuration, we need get the configuration
+    //from context.getConfigration() to retrieve pig properties
+    PigSplit pigSplit = (PigSplit) split;
+    Configuration conf = context.getConfiguration();
+    pigSplit.setConf(conf);
+    //Set current splitIndex in PigMapReduce.sJobContext.getConfiguration.get(PigImplConstants.PIG_SPLIT_INDEX)
+    //which will be used in POMergeCogroup#setup
+    if (PigMapReduce.sJobContext == null) {
+        PigMapReduce.sJobContext = HadoopShims.createJobContext(conf, new JobID());
     }
+    PigMapReduce.sJobContext.getConfiguration().setInt(PigImplConstants.PIG_SPLIT_INDEX, pigSplit.getSplitIndex());
+    // Here JobConf is first available in spark Executor thread, we initialize PigContext,UDFContext and
+    // SchemaTupleBackend by reading properties from JobConf
+    initialize(conf);
+    return super.createRecordReader(split, context);
+}
 
     private void initialize(Configuration jobConf) throws IOException {
         MapRedUtil.setupUDFContext(jobConf);
@@ -73,12 +73,12 @@ public class PigInputFormatSpark extends
     }
 
     private void resetUDFContext() {
-		UDFContext.getUDFContext().reset();
-	}
+        UDFContext.getUDFContext().reset();
+    }
 
-	private void initLogger() {
-		PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();
-		pigHadoopLogger.setReporter(PigStatusReporter.getInstance());
-		PhysicalOperator.setPigLogger(pigHadoopLogger);
-	}
-}
\ No newline at end of file
+    private void initLogger() {
+        PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();
+        pigHadoopLogger.setReporter(PigStatusReporter.getInstance());
+        PhysicalOperator.setPigLogger(pigHadoopLogger);
+    }
+}

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPlanContainer.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPlanContainer.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPlanContainer.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPlanContainer.java Wed Apr 12 02:20:20 2017
@@ -291,7 +291,7 @@ public class TezPlanContainer extends Op
         Set<TezOperator> splitters2 = new HashSet<>();
         Set<TezOperator> processedPredecessors = new HashSet<>();
         // Find predecessors which are splitters
-        fetchSplitterPredecessors(plan, operToSegment, processedPredecessors, splitters1);
+        fetchSplitterPredecessors(plan, operToSegment, processedPredecessors, splitters1, false);
         if (!splitters1.isEmpty()) {
             // For the successor, traverse rest of the plan below it and
             // search the predecessors of its successors to find any predecessor that might be a splitter.
@@ -300,7 +300,7 @@ public class TezPlanContainer extends Op
             processedPredecessors.clear();
             processedPredecessors.add(successor);
             for (TezOperator succ : allSuccs) {
-                fetchSplitterPredecessors(plan, succ, processedPredecessors, splitters2);
+                fetchSplitterPredecessors(plan, succ, processedPredecessors, splitters2, true);
             }
             // Find the common ones
             splitters1.retainAll(splitters2);
@@ -309,7 +309,7 @@ public class TezPlanContainer extends Op
     }
 
     private void fetchSplitterPredecessors(TezOperPlan plan, TezOperator tezOp,
-            Set<TezOperator> processedPredecessors, Set<TezOperator> splitters) {
+            Set<TezOperator> processedPredecessors, Set<TezOperator> splitters, boolean stopAtSplit) {
         List<TezOperator> predecessors = plan.getPredecessors(tezOp);
         if (predecessors != null) {
             for (TezOperator pred : predecessors) {
@@ -319,9 +319,13 @@ public class TezPlanContainer extends Op
                 }
                 if (pred.isSplitter()) {
                     splitters.add(pred);
+                    if (!stopAtSplit) {
+                        processedPredecessors.add(pred);
+                        fetchSplitterPredecessors(plan, pred, processedPredecessors, splitters, stopAtSplit);
+                    }
                 } else if (!pred.needSegmentBelow()) {
                     processedPredecessors.add(pred);
-                    fetchSplitterPredecessors(plan, pred, processedPredecessors, splitters);
+                    fetchSplitterPredecessors(plan, pred, processedPredecessors, splitters, stopAtSplit);
                 }
             }
         }

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/AccumulatorOptimizer.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/AccumulatorOptimizer.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/AccumulatorOptimizer.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/AccumulatorOptimizer.java Wed Apr 12 02:20:20 2017
@@ -37,6 +37,6 @@ public class AccumulatorOptimizer extend
 
     @Override
     public void visitTezOp(TezOperator tezOp) throws VisitorException {
-        AccumulatorOptimizerUtil.addAccumulator(tezOp.plan);
+        AccumulatorOptimizerUtil.addAccumulator(tezOp.plan, tezOp.plan.getRoots());
     }
 }

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/SecondaryKeyOptimizerTez.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/SecondaryKeyOptimizerTez.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/SecondaryKeyOptimizerTez.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/SecondaryKeyOptimizerTez.java Wed Apr 12 02:20:20 2017
@@ -102,7 +102,8 @@ public class SecondaryKeyOptimizerTez ex
             rearrangePlan = PlanHelper.getLocalRearrangePlanFromSplit(from.plan, connectingLR.getOperatorKey());
         }
 
-        SecondaryKeyOptimizerInfo info = SecondaryKeyOptimizerUtil.applySecondaryKeySort(rearrangePlan, to.plan);
+        SecondaryKeyOptimizerUtil secondaryKeyOptUtil = new SecondaryKeyOptimizerUtil();
+        SecondaryKeyOptimizerInfo info = secondaryKeyOptUtil.applySecondaryKeySort(rearrangePlan, to.plan);
         if (info != null) {
             numSortRemoved += info.getNumSortRemoved();
             numDistinctChanged += info.getNumDistinctChanged();

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/AccumulatorOptimizerUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/AccumulatorOptimizerUtil.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/AccumulatorOptimizerUtil.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/AccumulatorOptimizerUtil.java Wed Apr 12 02:20:20 2017
@@ -61,9 +61,8 @@ public class AccumulatorOptimizerUtil {
         return batchSize;
     }
 
-    public static void addAccumulator(PhysicalPlan plan) {
+    public static void addAccumulator(PhysicalPlan plan, List<PhysicalOperator> pos) {
         // See if this is a map-reduce job
-        List<PhysicalOperator> pos = plan.getRoots();
         if (pos == null || pos.size() == 0) {
             return;
         }
@@ -290,91 +289,4 @@ public class AccumulatorOptimizerUtil {
 
         return false;
     }
-
-    public static void addAccumulatorSpark(PhysicalPlan plan) throws
-        VisitorException {
-        List<PhysicalOperator> pos = plan.getRoots();
-        if (pos == null || pos.size() == 0) {
-            return;
-        }
-
-        List<POGlobalRearrangeSpark> gras = PlanHelper.getPhysicalOperators(plan,
-                POGlobalRearrangeSpark.class);
-
-        for (POGlobalRearrange gra : gras) {
-            addAccumulatorSparkForGRASubDAG(plan, gra);
-        }
-    }
-
-
-    private static void addAccumulatorSparkForGRASubDAG(PhysicalPlan plan,
-        POGlobalRearrange gra) throws VisitorException {
-
-        List<PhysicalOperator> poPackages = plan.getSuccessors(gra);
-
-        if (poPackages == null || poPackages.size() == 0) {
-            return;
-        }
-        // See if this is a POPackage
-        PhysicalOperator po_package = poPackages.get(0);
-        if (!po_package.getClass().equals(POPackage.class)) {
-            return;
-        }
-
-        Packager pkgr = ((POPackage) po_package).getPkgr();
-        // Check that this is a standard package, not a subclass
-        if (!pkgr.getClass().equals(Packager.class)) {
-            return;
-        }
-
-        // if POPackage is for distinct, just return
-        if (pkgr.isDistinct()) {
-            return;
-        }
-
-        // if any input to POPackage is inner, just return
-        boolean[] isInner = pkgr.getInner();
-        for (boolean b : isInner) {
-            if (b) {
-                return;
-            }
-        }
-
-        List<PhysicalOperator> l = plan.getSuccessors(po_package);
-        // there should be only one POForEach
-        if (l == null || l.size() == 0 || l.size() > 1) {
-            return;
-        }
-
-        PhysicalOperator po_foreach = l.get(0);
-        if (!(po_foreach instanceof POForEach)) {
-            return;
-        }
-
-        boolean foundUDF = false;
-        List<PhysicalPlan> list = ((POForEach) po_foreach).getInputPlans();
-        for (PhysicalPlan p : list) {
-            PhysicalOperator po = p.getLeaves().get(0);
-
-            // only expression operators are allowed
-            if (!(po instanceof ExpressionOperator)) {
-                return;
-            }
-
-            if (((ExpressionOperator) po).containUDF()) {
-                foundUDF = true;
-            }
-
-            if (!check(po)) {
-                return;
-            }
-        }
-
-        if (foundUDF) {
-            // if all tests are passed, reducer can run in accumulative mode
-            LOG.info("Reducer is to run in accumulative mode.");
-            po_package.setAccumulative();
-            po_foreach.setAccumulative();
-        }
-    }
 }
\ No newline at end of file

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/SecondaryKeyOptimizerUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/SecondaryKeyOptimizerUtil.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/SecondaryKeyOptimizerUtil.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/SecondaryKeyOptimizerUtil.java Wed Apr 12 02:20:20 2017
@@ -53,12 +53,12 @@ import org.apache.pig.impl.plan.Operator
 import org.apache.pig.impl.plan.PlanException;
 import org.apache.pig.impl.plan.VisitorException;
 
-@InterfaceAudience.Private
+@InterfaceAudience.Public
 public class SecondaryKeyOptimizerUtil {
     private static Log log = LogFactory.getLog(SecondaryKeyOptimizerUtil.class.getName());
     private static boolean isSparkMode;
 
-    private SecondaryKeyOptimizerUtil() {
+    public SecondaryKeyOptimizerUtil() {
 
     }
 
@@ -186,7 +186,7 @@ public class SecondaryKeyOptimizerUtil {
         return result;
     }
 
-    public static SecondaryKeyOptimizerInfo applySecondaryKeySort(PhysicalPlan mapPlan, PhysicalPlan reducePlan) throws VisitorException {
+    public SecondaryKeyOptimizerInfo applySecondaryKeySort(PhysicalPlan mapPlan, PhysicalPlan reducePlan) throws VisitorException {
         log.trace("Entering SecondaryKeyOptimizerUtil.addSecondaryKeySort");
         SecondaryKeyOptimizerInfo secKeyOptimizerInfo = new SecondaryKeyOptimizerInfo();
         List<SortKeyInfo> sortKeyInfos = new ArrayList<SortKeyInfo>();
@@ -245,29 +245,7 @@ public class SecondaryKeyOptimizerUtil {
         }
 
         PhysicalOperator root = reduceRoots.get(0);
-        PhysicalOperator currentNode = null;
-        if (!isSparkMode) {
-            if (!(root instanceof POPackage)) {
-                log.debug("Expected reduce root to be a POPackage, skip secondary key optimizing");
-                return null;
-            } else {
-                currentNode = root;
-            }
-        } else {
-            if (!(root instanceof POGlobalRearrange)) {
-                log.debug("Expected reduce root to be a POGlobalRearrange, skip secondary key optimizing");
-                return null;
-            } else {
-                List<PhysicalOperator> globalRearrangeSuccs = reducePlan
-                        .getSuccessors(root);
-                if (globalRearrangeSuccs.size() == 1) {
-                    currentNode = globalRearrangeSuccs.get(0);
-                } else {
-                    log.debug("Expected successor of a POGlobalRearrange is POPackage, skip secondary key optimizing");
-                    return null;
-                }
-            }
-        }
+        PhysicalOperator currentNode = getCurrentNode(root,reducePlan);
 
         // visit the POForEach of the reduce plan. We can have Limit and Filter
         // in the middle
@@ -442,6 +420,16 @@ public class SecondaryKeyOptimizerUtil {
         return secKeyOptimizerInfo;
     }
 
+    protected PhysicalOperator getCurrentNode(PhysicalOperator root, PhysicalPlan reducePlan) {
+        PhysicalOperator currentNode = null;
+        if (!(root instanceof POPackage)) {
+            log.debug("Expected reduce root to be a POPackage, skip secondary key optimizing");
+        } else {
+            currentNode = root;
+        }
+        return currentNode;
+    }
+
     private static void setSecondaryPlan(PhysicalPlan plan, POLocalRearrange rearrange,
             SortKeyInfo secondarySortKeyInfo) throws VisitorException {
         // Put plan to project secondary key to the POLocalRearrange

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java Wed Apr 12 02:20:20 2017
@@ -308,7 +308,7 @@ public class HBaseStorage extends LoadFu
         //so we need check whether UDFContext.getUDFContext().getClientSystemProps()
         //is null or not, if is null, defaultCaster =STRING_CASTER, otherwise is
         //UDFContext.getUDFContext().getClientSystemProps().getProperty(CASTER_PROPERTY, STRING_CASTER)
-        //Detail see PIG-4611
+        //Detail see PIG-4920
         String defaultCaster = UDFContext.getUDFContext().getClientSystemProps() != null ? UDFContext.getUDFContext().getClientSystemProps().getProperty(CASTER_PROPERTY, STRING_CASTER) : STRING_CASTER;
         String casterOption = configuredOptions_.getOptionValue("caster", defaultCaster);
         if (STRING_CASTER.equalsIgnoreCase(casterOption)) {

Modified: pig/branches/spark/src/org/apache/pig/data/SelfSpillBag.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/data/SelfSpillBag.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/data/SelfSpillBag.java (original)
+++ pig/branches/spark/src/org/apache/pig/data/SelfSpillBag.java Wed Apr 12 02:20:20 2017
@@ -22,6 +22,8 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.classification.InterfaceAudience;
 import org.apache.pig.classification.InterfaceStability;
 
+import java.io.Serializable;
+
 /**
  * Class to hold code common to self spilling bags such as InternalCachedBag
  */
@@ -30,8 +32,7 @@ import org.apache.pig.classification.Int
 public abstract class SelfSpillBag extends DefaultAbstractBag {
     private static final long serialVersionUID = 1L;
     // SelfSpillBag$MemoryLimits is not serializable
-    //in spark mode, if we don't set memLimit transient, it will throw NotSerializableExecption(See PIG-4611)
-    protected transient MemoryLimits memLimit;
+    protected MemoryLimits memLimit;
 
     public SelfSpillBag(int bagCount) {
         memLimit = new MemoryLimits(bagCount, -1);
@@ -49,10 +50,11 @@ public abstract class SelfSpillBag exten
      * The number of objects that will fit into this memory limit is computed
      * using the average memory size of the objects whose size is given to this
      * class.
+     * In spark mode, MemoryLimits needs implement Serializable interface otherwise NotSerializableExecption will be thrown (See PIG-4611)
      */
     @InterfaceAudience.Private
     @InterfaceStability.Evolving
-    public static class MemoryLimits {
+    public static class MemoryLimits implements Serializable {
 
         private long maxMemUsage;
         private long cacheLimit = Integer.MAX_VALUE;

Modified: pig/branches/spark/src/org/apache/pig/impl/util/UDFContext.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/util/UDFContext.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/util/UDFContext.java (original)
+++ pig/branches/spark/src/org/apache/pig/impl/util/UDFContext.java Wed Apr 12 02:20:20 2017
@@ -23,11 +23,14 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Properties;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
 
 public class UDFContext {
 
+    private static final Log LOG = LogFactory.getLog(UDFContext.class);
     private Configuration jconf = null;
     private HashMap<UDFContextKey, Properties> udfConfs;
     private Properties clientSysProps;
@@ -204,6 +207,17 @@ public class UDFContext {
         conf.set(CLIENT_SYS_PROPS, ObjectSerializer.serialize(clientSysProps));
     }
 
+    /*
+     called by SparkEngineConf#writeObject
+     */
+    public String serialize() {
+        try {
+            return ObjectSerializer.serialize(udfConfs);
+        } catch (IOException e) {
+            LOG.error("UDFContext#serialize throws error ",e);
+            return null;
+        }
+    }
 
     /**
      * Populate the udfConfs field.  This function is intended to

Modified: pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LOStore.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LOStore.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LOStore.java (original)
+++ pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LOStore.java Wed Apr 12 02:20:20 2017
@@ -36,6 +36,7 @@ public class LOStore extends LogicalRela
     private boolean isTmpStore;
     private SortInfo sortInfo;
     private final StoreFuncInterface storeFunc;
+    private boolean disambiguationEnabled = true;
 
     public LOStore(LogicalPlan plan, FileSpec outputFileSpec, StoreFuncInterface storeFunc, String signature) {
         super("LOStore", plan);
@@ -43,6 +44,12 @@ public class LOStore extends LogicalRela
         this.storeFunc = storeFunc;
         this.signature = signature;
     }
+
+    public LOStore(LogicalPlan plan, FileSpec outputFileSpec, StoreFuncInterface storeFunc, String signature,
+                   boolean disambiguationEnabled) {
+        this(plan, outputFileSpec, storeFunc, signature);
+        this.disambiguationEnabled = disambiguationEnabled;
+    }
     
     public FileSpec getOutputSpec() {
         return output;
@@ -55,6 +62,17 @@ public class LOStore extends LogicalRela
     @Override
     public LogicalSchema getSchema() throws FrontendException {
         schema = ((LogicalRelationalOperator)plan.getPredecessors(this).get(0)).getSchema();
+
+        if (!disambiguationEnabled && schema != null && schema.getFields() != null) {
+            //If requested try and remove parent alias substring including colon(s)
+            for (LogicalSchema.LogicalFieldSchema field : schema.getFields()) {
+                if (field.alias == null || !field.alias.contains(":")) {
+                    continue;
+                }
+                field.alias = field.alias.substring(field.alias.lastIndexOf(":") + 1);
+            }
+        }
+
         return schema;
     }
 

Modified: pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/ScalarVisitor.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/ScalarVisitor.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/ScalarVisitor.java (original)
+++ pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/ScalarVisitor.java Wed Apr 12 02:20:20 2017
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.util.List;
 
 import org.apache.pig.FuncSpec;
+import org.apache.pig.PigConfiguration;
 import org.apache.pig.StoreFuncInterface;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileLocalizer;
@@ -94,7 +95,10 @@ public class ScalarVisitor extends AllEx
                     StoreFuncInterface stoFunc = (StoreFuncInterface)PigContext.instantiateFuncFromSpec(interStorageFuncSpec);
                     String sig = LogicalPlanBuilder.newOperatorKey(scope);
                     stoFunc.setStoreFuncUDFContextSignature(sig);
-                    store = new LOStore(lp, fileSpec, stoFunc, sig);
+                    boolean disambiguationEnabled = Boolean.parseBoolean(pigContext.getProperties().
+                            getProperty(PigConfiguration.PIG_STORE_SCHEMA_DISAMBIGUATE,PigConfiguration.PIG_STORE_SCHEMA_DISAMBIGUATE_DEFAULT));
+
+                    store = new LOStore(lp, fileSpec, stoFunc, sig, disambiguationEnabled);
                     store.setTmpStore(true);
                     lp.add( store );
                     lp.connect( refOp, store );

Modified: pig/branches/spark/src/org/apache/pig/parser/LogicalPlanBuilder.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/parser/LogicalPlanBuilder.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/parser/LogicalPlanBuilder.java (original)
+++ pig/branches/spark/src/org/apache/pig/parser/LogicalPlanBuilder.java Wed Apr 12 02:20:20 2017
@@ -1003,8 +1003,10 @@ public class LogicalPlanBuilder {
                 fileNameMap.put(fileNameKey, absolutePath);
             }
             FileSpec fileSpec = new FileSpec(absolutePath, funcSpec);
+            boolean disambiguationEnabled = Boolean.parseBoolean(pigContext.getProperties().
+                    getProperty(PigConfiguration.PIG_STORE_SCHEMA_DISAMBIGUATE,PigConfiguration.PIG_STORE_SCHEMA_DISAMBIGUATE_DEFAULT));
 
-            LOStore op = new LOStore(plan, fileSpec, stoFunc, signature);
+            LOStore op = new LOStore(plan, fileSpec, stoFunc, signature, disambiguationEnabled);
             return buildOp(loc, op, alias, inputAlias, null);
         } catch(Exception ex) {
             throw new ParserValidationException(intStream, loc, ex);

Modified: pig/branches/spark/src/org/apache/pig/parser/QueryParserUtils.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/parser/QueryParserUtils.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/parser/QueryParserUtils.java (original)
+++ pig/branches/spark/src/org/apache/pig/parser/QueryParserUtils.java Wed Apr 12 02:20:20 2017
@@ -70,8 +70,10 @@ public class QueryParserUtils {
         fileName = removeQuotes( fileName );
         FileSpec fileSpec = new FileSpec( fileName, funcSpec );
         String sig = alias + "_" + LogicalPlanBuilder.newOperatorKey(scope);
+        boolean disambiguationEnabled = Boolean.parseBoolean(pigContext.getProperties().
+                getProperty(PigConfiguration.PIG_STORE_SCHEMA_DISAMBIGUATE,PigConfiguration.PIG_STORE_SCHEMA_DISAMBIGUATE_DEFAULT));
         stoFunc.setStoreFuncUDFContextSignature(sig);
-        LOStore store = new LOStore(lp, fileSpec, stoFunc, sig);
+        LOStore store = new LOStore(lp, fileSpec, stoFunc, sig, disambiguationEnabled);
         store.setAlias(alias);
 
         try {

Modified: pig/branches/spark/src/org/apache/pig/tools/grunt/ConsoleReaderInputStream.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/grunt/ConsoleReaderInputStream.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/grunt/ConsoleReaderInputStream.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/grunt/ConsoleReaderInputStream.java Wed Apr 12 02:20:20 2017
@@ -23,6 +23,7 @@ import java.io.SequenceInputStream;
 import java.util.Enumeration;
 
 import jline.console.ConsoleReader;
+import jline.console.history.FileHistory;
 
 /** Borrowed from jline.console.internal.ConsoleReaderInputStream. However,
  *  we cannot use ConsoleReaderInputStream directly since:
@@ -104,6 +105,9 @@ public class ConsoleReaderInputStream ex
 
             if (buffer == null) {
                 buffer = reader.readLine().getBytes();
+
+                //Write current grunt buffer to pig history file
+                ((FileHistory)reader.getHistory()).flush();
             }
 
             if (buffer == null) {

Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java Wed Apr 12 02:20:20 2017
@@ -21,6 +21,7 @@ package org.apache.pig.tools.pigstats.sp
 import java.util.List;
 import java.util.Map;
 
+import org.apache.pig.tools.pigstats.*;
 import scala.Option;
 
 import org.apache.hadoop.conf.Configuration;
@@ -31,11 +32,6 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.newplan.PlanVisitor;
-import org.apache.pig.tools.pigstats.InputStats;
-import org.apache.pig.tools.pigstats.JobStats;
-import org.apache.pig.tools.pigstats.OutputStats;
-import org.apache.pig.tools.pigstats.PigStats;
-import org.apache.pig.tools.pigstats.mapreduce.MRPigStatsUtil;
 import org.apache.spark.executor.ShuffleReadMetrics;
 import org.apache.spark.executor.ShuffleWriteMetrics;
 import org.apache.spark.executor.TaskMetrics;
@@ -72,7 +68,7 @@ public class SparkJobStats extends JobSt
             long bytes = getOutputSize(poStore, conf);
             long recordsCount = -1;
             if (disableCounter == false) {
-                recordsCount = SparkStatsUtil.getStoreSparkCounterValue(poStore);
+                recordsCount = SparkStatsUtil.getRecordCount(poStore);
             }
             OutputStats outputStats = new OutputStats(poStore.getSFile().getFileName(),
                     bytes, recordsCount, success);
@@ -88,7 +84,7 @@ public class SparkJobStats extends JobSt
 
         long recordsCount = -1;
         if (disableCounter == false) {
-            recordsCount = SparkStatsUtil.getLoadSparkCounterValue(po);
+            recordsCount = SparkStatsUtil.getRecordCount(po);
         }
         long bytesRead = -1;
         if (singleInput && stats.get("BytesRead") != null) {
@@ -190,13 +186,13 @@ public class SparkJobStats extends JobSt
         if (inputMetricExist) {
             results.put("BytesRead", bytesRead);
             hdfsBytesRead = bytesRead;
-            counters.incrCounter(FS_COUNTER_GROUP, MRPigStatsUtil.HDFS_BYTES_READ, hdfsBytesRead);
+            counters.incrCounter(FS_COUNTER_GROUP, PigStatsUtil.HDFS_BYTES_READ, hdfsBytesRead);
         }
 
         if (outputMetricExist) {
             results.put("BytesWritten", bytesWritten);
             hdfsBytesWritten = bytesWritten;
-            counters.incrCounter(FS_COUNTER_GROUP, MRPigStatsUtil.HDFS_BYTES_WRITTEN, hdfsBytesWritten);
+            counters.incrCounter(FS_COUNTER_GROUP, PigStatsUtil.HDFS_BYTES_WRITTEN, hdfsBytesWritten);
         }
 
         if (shuffleReadMetricExist) {
@@ -331,7 +327,7 @@ public class SparkJobStats extends JobSt
     private void initializeHadoopCounter() {
         counters = new Counters();
         Counters.Group fsGrp = counters.addGroup(FS_COUNTER_GROUP, FS_COUNTER_GROUP);
-        fsGrp.addCounter(MRPigStatsUtil.HDFS_BYTES_READ, MRPigStatsUtil.HDFS_BYTES_READ, 0);
-        fsGrp.addCounter(MRPigStatsUtil.HDFS_BYTES_WRITTEN, MRPigStatsUtil.HDFS_BYTES_WRITTEN, 0);
+        fsGrp.addCounter(PigStatsUtil.HDFS_BYTES_READ, PigStatsUtil.HDFS_BYTES_READ, 0);
+        fsGrp.addCounter(PigStatsUtil.HDFS_BYTES_WRITTEN, PigStatsUtil.HDFS_BYTES_WRITTEN, 0);
     }
 }

Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java Wed Apr 12 02:20:20 2017
@@ -93,9 +93,7 @@ public class SparkPigStats extends PigSt
         addInputInfoForSparkOper(sparkOperator, jobStats, isSuccess, jobMetricsListener, conf);
         jobSparkOperatorMap.put(jobStats, sparkOperator);
         jobPlan.add(jobStats);
-        if (e != null) {
-            jobStats.setBackendException(e);
-        }
+        jobStats.setBackendException(e);
     }
 
     public void addNativeJobStats(NativeSparkOperator sparkOperator, String jobId, boolean isSuccess, Exception e) {
@@ -103,9 +101,7 @@ public class SparkPigStats extends PigSt
         jobStats.setSuccessful(isSuccess);
         jobSparkOperatorMap.put(jobStats, sparkOperator);
         jobPlan.add(jobStats);
-        if (e != null) {
-            jobStats.setBackendException(e);
-        }
+        jobStats.setBackendException(e);
     }
 
     public void finish() {

Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java Wed Apr 12 02:20:20 2017
@@ -37,10 +37,10 @@ import org.apache.spark.api.java.JavaSpa
 
 public class SparkStatsUtil {
 
-    public static final String SPARK_STORE_COUNTER_GROUP = "Spark Store Counters";
-    public static final String SPARK_STORE_RECORD_COUNTER = "Output records in ";
-    public static final String SPARK_INPUT_COUNTER_GROUP = "Spark Input Counters";
-    public static final String SPARK_INPUT_RECORD_COUNTER = "Input records from ";
+    public static final String SPARK_STORE_COUNTER_GROUP = PigStatsUtil.MULTI_STORE_COUNTER_GROUP;
+    public static final String SPARK_STORE_RECORD_COUNTER = PigStatsUtil.MULTI_STORE_RECORD_COUNTER;
+    public static final String SPARK_INPUT_COUNTER_GROUP = PigStatsUtil.MULTI_INPUTS_COUNTER_GROUP;
+    public static final String SPARK_INPUT_RECORD_COUNTER = PigStatsUtil.MULTI_INPUTS_RECORD_COUNTER;
 
     public static void waitForJobAddStats(int jobID,
                                           POStore poStore, SparkOperator sparkOperator,
@@ -71,7 +71,7 @@ public class SparkStatsUtil {
                 sparkContext, e);
     }
 
-    public static String getStoreSparkCounterName(POStore store) {
+    public static String getCounterName(POStore store) {
         String shortName = PigStatsUtil.getShortName(store.getSFile().getFileName());
 
         StringBuffer sb = new StringBuffer(SPARK_STORE_RECORD_COUNTER);
@@ -84,7 +84,7 @@ public class SparkStatsUtil {
         return sb.toString();
     }
 
-    public static String getLoadSparkCounterName(POLoad load) {
+    public static String getCounterName(POLoad load) {
         String shortName = PigStatsUtil.getShortName(load.getLFile().getFileName());
 
         StringBuffer sb = new StringBuffer(SPARK_INPUT_RECORD_COUNTER);
@@ -95,15 +95,15 @@ public class SparkStatsUtil {
         return sb.toString();
     }
 
-    public static long getStoreSparkCounterValue(POStore store) {
+    public static long getRecordCount(POStore store) {
         SparkPigStatusReporter reporter = SparkPigStatusReporter.getInstance();
-        return reporter.getCounters().getValue(SPARK_STORE_COUNTER_GROUP, getStoreSparkCounterName(store));
+        return reporter.getCounters().getValue(SPARK_STORE_COUNTER_GROUP, getCounterName(store));
     }
 
-    public static long getLoadSparkCounterValue(POLoad load) {
+    public static long getRecordCount(POLoad load) {
         SparkPigStatusReporter reporter = SparkPigStatusReporter.getInstance();
         int loadersCount = countCoLoadsIfInSplit(load,load.getParentPlan());
-        return reporter.getCounters().getValue(SPARK_INPUT_COUNTER_GROUP, getLoadSparkCounterName(load))/loadersCount;
+        return reporter.getCounters().getValue(SPARK_INPUT_COUNTER_GROUP, getCounterName(load))/loadersCount;
     }
 
     private static int countCoLoadsIfInSplit(PhysicalOperator op, PhysicalPlan pp){

Modified: pig/branches/spark/test/org/apache/pig/test/TestGrunt.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestGrunt.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestGrunt.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestGrunt.java Wed Apr 12 02:20:20 2017
@@ -1003,7 +1003,7 @@ public class TestGrunt {
             } else {
                 //In spark mode, We wrap ExecException to RunTimeException and is thrown out in JobGraphBuilder#sparkOperToRDD,
                 //So unwrap the exception here
-                assertTrue(((ExecException) e.getCause().getCause()).getErrorCode() == 6017);
+                assertTrue(((ExecException) e.getCause()).getErrorCode() == 6017);
             }
         }
 

Modified: pig/branches/spark/test/org/apache/pig/test/TestProjectRange.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestProjectRange.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestProjectRange.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestProjectRange.java Wed Apr 12 02:20:20 2017
@@ -651,7 +651,7 @@ public class TestProjectRange  {
                 };
         Schema s = pigServer.dumpSchema("f");
         Util.checkQueryOutputs(it, expectedRes,org.apache.pig.newplan.logical.Util.translateSchema(s),
-        		Util.isSparkExecType(cluster.getExecType()));
+                Util.isSparkExecType(cluster.getExecType()));
     }
 
     /**
@@ -738,8 +738,8 @@ public class TestProjectRange  {
                         "(1,{(11,21,31,41,51),(10,20,30,40,50)})",
                 };
         Schema s = pigServer.dumpSchema("f");
-        Util.checkQueryOutputs(it, expectedRes,org.apache.pig.newplan.logical.Util.translateSchema(s), 
-        		Util.isSparkExecType(cluster.getExecType()));
+        Util.checkQueryOutputs(it, expectedRes,org.apache.pig.newplan.logical.Util.translateSchema(s),
+                Util.isSparkExecType(cluster.getExecType()));
 
     }
 
@@ -928,8 +928,8 @@ public class TestProjectRange  {
             "  g = group l1 by   .. c,  l2 by .. c;"
             ;
         String expectedSchStr = "grp: (a: int,b: long,c: int)," +
-        		"l1: {t : (a: int,b: long,c: int,d: int,e: int)}," +
-        		"l2: {t : (a: int,b: long,c: int,d: int,e: int)}";
+                "l1: {t : (a: int,b: long,c: int,d: int,e: int)}," +
+                "l2: {t : (a: int,b: long,c: int,d: int,e: int)}";
 
         Schema expectedSch = getCleanedGroupSchema(expectedSchStr);
         compileAndCompareSchema(expectedSch, query, "g");
@@ -948,8 +948,8 @@ public class TestProjectRange  {
                 };
         Iterator<Tuple> it = pigServer.openIterator("g");
         Schema s = pigServer.dumpSchema("g");
-        Util.checkQueryOutputs(it, expectedRes,org.apache.pig.newplan.logical.Util.translateSchema(s), 
-        		Util.isSparkExecType(cluster.getExecType()));
+        Util.checkQueryOutputs(it, expectedRes,org.apache.pig.newplan.logical.Util.translateSchema(s),
+                Util.isSparkExecType(cluster.getExecType()));
     }
 
     /**
@@ -1017,7 +1017,7 @@ public class TestProjectRange  {
         Iterator<Tuple> it = pigServer.openIterator("g");
         Schema s = pigServer.dumpSchema("g");
         Util.checkQueryOutputs(it,expectedRes, org.apache.pig.newplan.logical.Util.translateSchema(s),
-        		Util.isSparkExecType(cluster.getExecType()));
+                Util.isSparkExecType(cluster.getExecType()));
     }
 
     @Test
@@ -1068,8 +1068,8 @@ public class TestProjectRange  {
                 };
         Iterator<Tuple> it = pigServer.openIterator("lim");
         Schema s = pigServer.dumpSchema("lim");
-        Util.checkQueryOutputs(it, expectedRes,org.apache.pig.newplan.logical.Util.translateSchema(s), 
-        		Util.isSparkExecType(cluster.getExecType()));
+        Util.checkQueryOutputs(it, expectedRes,org.apache.pig.newplan.logical.Util.translateSchema(s),
+                Util.isSparkExecType(cluster.getExecType()));
     }
 
 
@@ -1132,7 +1132,7 @@ public class TestProjectRange  {
         Iterator<Tuple> it = pigServer.openIterator("g");
         Schema s = pigServer.dumpSchema("g");
         Util.checkQueryOutputs(it, expectedRes, org.apache.pig.newplan.logical.Util.translateSchema(s),
-        		Util.isSparkExecType(cluster.getExecType()));
+                Util.isSparkExecType(cluster.getExecType()));
     }
 
     private void setAliasesToNull(Schema schema) {
@@ -1171,8 +1171,8 @@ public class TestProjectRange  {
                 };
         Iterator<Tuple> it = pigServer.openIterator("j");
         Schema s = pigServer.dumpSchema("j");
-        Util.checkQueryOutputs(it, expectedRes,org.apache.pig.newplan.logical.Util.translateSchema(s), 
-        		Util.isSparkExecType(cluster.getExecType()));
+        Util.checkQueryOutputs(it, expectedRes, org.apache.pig.newplan.logical.Util.translateSchema(s),
+                Util.isSparkExecType(cluster.getExecType()));
     }
 
     @Test
@@ -1200,8 +1200,8 @@ public class TestProjectRange  {
                 };
         Iterator<Tuple> it = pigServer.openIterator("j");
         Schema s = pigServer.dumpSchema("j");
-        Util.checkQueryOutputs(it, expectedRes, org.apache.pig.newplan.logical.Util.translateSchema(s), 
-        		Util.isSparkExecType(cluster.getExecType()));
+        Util.checkQueryOutputs(it, expectedRes, org.apache.pig.newplan.logical.Util.translateSchema(s),
+                Util.isSparkExecType(cluster.getExecType()));
     }
 
     @Test
@@ -1213,7 +1213,7 @@ public class TestProjectRange  {
             "  l2 = load '" + INP_FILE_5FIELDS +  "';" +
             "  g = cogroup l1 by  ($0 ..  ),  l2 by ($0 .. );";
         Util.checkExceptionMessage(query, "g", "Cogroup/Group by '*' or 'x..' " +
-        		"(range of columns to the end) " +
+                "(range of columns to the end) " +
                         "is only allowed if the input has a schema");
     }
 

Modified: pig/branches/spark/test/org/apache/pig/test/TestPruneColumn.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestPruneColumn.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestPruneColumn.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestPruneColumn.java Wed Apr 12 02:20:20 2017
@@ -611,7 +611,7 @@ public class TestPruneColumn {
                 "({(5)},{})"
         };
         Util.checkQueryOutputs(iter, expected, org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("D")),
-        		Util.isSparkExecType(Util.getLocalTestMode()));
+                Util.isSparkExecType(Util.getLocalTestMode()));
 
         assertTrue(emptyLogFileMessage());
     }
@@ -963,7 +963,7 @@ public class TestPruneColumn {
                 "((2,5,2))"
         };
         Util.checkQueryOutputs(iter, expected, org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("C")),
-        		Util.isSparkExecType(Util.getLocalTestMode()));
+                Util.isSparkExecType(Util.getLocalTestMode()));
 
         assertTrue(emptyLogFileMessage());
     }
@@ -1513,7 +1513,7 @@ public class TestPruneColumn {
                 "({(3),(3),(3)},3)"
         };
         Util.checkQueryOutputs(iter, expected, org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("G")),
-        		Util.isSparkExecType(Util.getLocalTestMode()));
+                Util.isSparkExecType(Util.getLocalTestMode()));
 
         assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $1"}));
         pigServer.getPigContext().getProperties().remove(PigImplConstants.PIG_OPTIMIZER_RULES_KEY);
@@ -1535,7 +1535,7 @@ public class TestPruneColumn {
                 "(5,{(2,5,2)})"
         };
         Util.checkQueryOutputs(iter, expected, org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("D"))
-        		,Util.isSparkExecType(Util.getLocalTestMode()));
+                , Util.isSparkExecType(Util.getLocalTestMode()));
 
         assertTrue(checkLogFileMessage(new String[]{"Columns pruned for B: $0"}));
     }
@@ -1865,7 +1865,7 @@ public class TestPruneColumn {
                 "(2,5,2,2)"
         };
         Util.checkQueryOutputs(iter, expected, org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("C")),
-        		Util.isSparkExecType(Util.getLocalTestMode()));
+                Util.isSparkExecType(Util.getLocalTestMode()));
 
 
         assertTrue(emptyLogFileMessage());

Modified: pig/branches/spark/test/org/apache/pig/test/TestSchema.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestSchema.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestSchema.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestSchema.java Wed Apr 12 02:20:20 2017
@@ -29,7 +29,9 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+import java.util.UUID;
 
+import org.apache.pig.PigConfiguration;
 import org.apache.pig.PigServer;
 import org.apache.pig.ResourceSchema;
 import org.apache.pig.data.DataType;
@@ -42,10 +44,28 @@ import org.apache.pig.impl.util.Utils;
 import org.apache.pig.newplan.logical.relational.LogicalSchema;
 import org.apache.pig.newplan.logical.relational.LogicalSchema.MergeMode;
 import org.apache.pig.parser.ParserException;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 public class TestSchema {
 
+    private static MiniGenericCluster cluster;
+    private static PigServer pigServer;
+
+    @BeforeClass
+    public static void setupTestCluster() throws Exception {
+        cluster = MiniGenericCluster.buildCluster();
+        pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
+    }
+
+    @AfterClass
+    public static void tearDownTestCluster() throws Exception {
+        cluster.shutDown();
+    }
+
     @Test
     public void testSchemaEqual1() {
 
@@ -660,8 +680,6 @@ public class TestSchema {
 
     @Test
     public void testSchemaSerialization() throws IOException {
-        MiniGenericCluster cluster = MiniGenericCluster.buildCluster();
-        PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
         String inputFileName = "testSchemaSerialization-input.txt";
         String[] inputData = new String[] { "foo\t1", "hello\t2" };
         Util.createInputFile(cluster, inputFileName, inputData);
@@ -673,7 +691,6 @@ public class TestSchema {
             Tuple t = it.next();
             assertEquals("{a: {(f1: chararray,f2: int)}}", t.get(0));
         }
-        cluster.shutDown();
     }
 
     @Test
@@ -938,4 +955,79 @@ public class TestSchema {
             assertTrue(schemaString.equals(s2));
         }
     }
+
+    @Test
+    public void testDisabledDisambiguationContainsNoColons() throws IOException {
+        resetDisambiguationTestPropertyOverride();
+
+        String inputFileName = "testPrepend-input.txt";
+        String[] inputData = new String[]{"apple\t1\tred", "orange\t2\torange", "kiwi\t3\tgreen", "orange\t4\torange"};
+        Util.createInputFile(cluster, inputFileName, inputData);
+
+        String script = "A = LOAD '" + inputFileName + "' AS (fruit:chararray, foo:int, color: chararray);" +
+                "B = LOAD '" + inputFileName + "' AS (id:chararray, bar:int);" +
+                "C = GROUP A BY (fruit,color);" +
+                "D = FOREACH C GENERATE FLATTEN(group), AVG(A.foo);" +
+                "D2 = FOREACH C GENERATE FLATTEN(group), AVG(A.foo) as avgFoo;" +
+                "E = JOIN B BY id, D BY group::fruit;" +
+                "F = UNION ONSCHEMA B, D2;" +
+                "G = CROSS B, D2;";
+
+        Util.registerMultiLineQuery(pigServer, script);
+
+        //Prepending should happen with default settings
+        assertEquals("{B::id: chararray,B::bar: int,D::group::fruit: chararray,D::group::color: chararray,double}", pigServer.dumpSchema("E").toString());
+
+        //Override prepend property setting (check for flatten, join)
+        pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_STORE_SCHEMA_DISAMBIGUATE, "false");
+        assertEquals("{id: chararray,bar: int,fruit: chararray,color: chararray,double}", pigServer.dumpSchema("E").toString());
+        assertTrue(pigServer.openIterator("E").hasNext());
+
+        //Check for union and cross
+        assertEquals("{id: chararray,bar: int,fruit: chararray,color: chararray,avgFoo: double}", pigServer.dumpSchema("F").toString());
+        assertEquals("{id: chararray,bar: int,fruit: chararray,color: chararray,avgFoo: double}", pigServer.dumpSchema("G").toString());
+
+    }
+
+    @Test
+    public void testEnabledDisambiguationPassesForDupeAliases() throws IOException {
+        resetDisambiguationTestPropertyOverride();
+
+        checkForDupeAliases();
+
+        //Should pass with default settings
+        assertEquals("{A::id: chararray,A::val: int,B::id: chararray,B::val: int}", pigServer.dumpSchema("C").toString());
+        assertTrue(pigServer.openIterator("C").hasNext());
+    }
+
+    @Test
+    public void testDisabledDisambiguationFailsForDupeAliases() throws IOException {
+        resetDisambiguationTestPropertyOverride();
+
+        try {
+            checkForDupeAliases();
+            //Should fail with prepending disabled
+            pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_STORE_SCHEMA_DISAMBIGUATE, "false");
+            pigServer.dumpSchema("C");
+        } catch (FrontendException e){
+            Assert.assertEquals("Duplicate schema alias: id in \"fake\"",e.getCause().getMessage());
+        }
+    }
+
+    private static void checkForDupeAliases() throws IOException {
+        String inputFileName = "testPrependFail-input" + UUID.randomUUID().toString() + ".txt";
+        String[] inputData = new String[]{"foo\t1", "bar\t2"};
+        Util.createInputFile(cluster, inputFileName, inputData);
+
+        String script = "A = LOAD '" + inputFileName + "' AS (id:chararray, val:int);" +
+                "B = LOAD '" + inputFileName + "' AS (id:chararray, val:int);" +
+                "C = JOIN A by id, B by id;";
+
+        Util.registerMultiLineQuery(pigServer, script);
+    }
+
+    private static void resetDisambiguationTestPropertyOverride() {
+        //Reset possible overrides
+        pigServer.getPigContext().getProperties().remove(PigConfiguration.PIG_STORE_SCHEMA_DISAMBIGUATE);
+    }
 }

Modified: pig/branches/spark/test/org/apache/pig/test/TestSecondarySort.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestSecondarySort.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestSecondarySort.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestSecondarySort.java Wed Apr 12 02:20:20 2017
@@ -270,7 +270,7 @@ public abstract class TestSecondarySort
                 "(1,{(1,2,3),(1,2,4),(1,3,4)})"
         };
         Util.checkQueryOutputs(iter, expected, org.apache
-                .pig.newplan.logical.Util.translateSchema(s), 
+                .pig.newplan.logical.Util.translateSchema(s),
                 Util.isSparkExecType(Util.getLocalTestMode()));
         Util.deleteFile(cluster, clusterPath);
     }

Modified: pig/branches/spark/test/org/apache/pig/tez/TestTezCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/tez/TestTezCompiler.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/tez/TestTezCompiler.java (original)
+++ pig/branches/spark/test/org/apache/pig/tez/TestTezCompiler.java Wed Apr 12 02:20:20 2017
@@ -203,6 +203,25 @@ public class TestTezCompiler {
         resetScope();
         resetFileLocalizer();
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-LoadStore-6.gld");
+
+        // Three levels of splits - a, a1 and a2.
+        // One split above and one split below a1 which is the split to be replaced with tmp store.
+        query =
+                "a = load 'file:///tmp/input';" +
+                "store a into 'file:///tmp/pigoutput/Dir0';" +
+                "a1 = filter a by $0 == 5;" +
+                "store a1 into 'file:///tmp/pigoutput/Dir1';" +
+                "a2 = distinct a1;" +
+                "store a2 into 'file:///tmp/pigoutput/Dir2';" +
+                "a3 = group a2 by $0;" +
+                "store a3 into 'file:///tmp/pigoutput/Dir3';" +
+                "b = load 'file:///tmp/pigoutput/Dir3';" +
+                "c = join a1 by $0, b by $0;" +
+                "store c into 'file:///tmp/pigoutput/Dir4';";
+
+        resetScope();
+        resetFileLocalizer();
+        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-LoadStore-7.gld");
     }
 
     @Test