You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by xu...@apache.org on 2016/07/08 02:35:08 UTC

svn commit: r1751847 - in /pig/branches/spark: src/org/apache/pig/backend/hadoop/executionengine/spark/ src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ src/org/apache/pig/backend/hadoop/executionengine/spark/operator/ src/org/apache/...

Author: xuefu
Date: Fri Jul  8 02:35:08 2016
New Revision: 1751847

URL: http://svn.apache.org/viewvc?rev=1751847&view=rev
Log:
Revert PIG-4797 and PIG-4944

Removed:
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/JoinGroupSparkConverter.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POJoinGroupSpark.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/JoinGroupOptimizerSpark.java
Modified:
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/IndexedKey.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PigSecondaryKeyComparatorSpark.java
    pig/branches/spark/test/org/apache/pig/test/TestEvalPipelineLocal.java

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java?rev=1751847&r1=1751846&r2=1751847&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java Fri Jul  8 02:35:08 2016
@@ -23,8 +23,6 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -50,7 +48,6 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
 import org.apache.pig.backend.hadoop.executionengine.spark.converter.RDDConverter;
 import org.apache.pig.backend.hadoop.executionengine.spark.operator.NativeSparkOperator;
-import org.apache.pig.backend.hadoop.executionengine.spark.operator.POJoinGroupSpark;
 import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOpPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperPlan;
 import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
@@ -241,8 +238,13 @@ public class JobGraphBuilder extends Spa
                                Set<OperatorKey> predsFromPreviousSparkOper)
             throws IOException {
         RDD<Tuple> nextRDD = null;
-        List<PhysicalOperator> predecessorsOfCurrentPhysicalOp = getPredecessors(plan, physicalOperator);
-        Set<OperatorKey> operatorKeysOfAllPreds = new LinkedHashSet<OperatorKey>();
+        List<PhysicalOperator> predecessorsOfCurrentPhysicalOp = plan
+                .getPredecessors(physicalOperator);
+        if (predecessorsOfCurrentPhysicalOp != null && predecessorsOfCurrentPhysicalOp.size() > 1) {
+            Collections.sort(predecessorsOfCurrentPhysicalOp);
+        }
+
+        Set<OperatorKey> operatorKeysOfAllPreds = new HashSet<OperatorKey>();
         addPredsFromPrevoiousSparkOp(sparkOperator, physicalOperator, operatorKeysOfAllPreds);
         if (predecessorsOfCurrentPhysicalOp != null) {
             for (PhysicalOperator predecessor : predecessorsOfCurrentPhysicalOp) {
@@ -294,29 +296,12 @@ public class JobGraphBuilder extends Spa
         }
     }
 
-    private List<PhysicalOperator> getPredecessors(PhysicalPlan plan, PhysicalOperator op) {
-        List preds = null;
-        if (!(op instanceof POJoinGroupSpark)) {
-            preds = plan.getPredecessors(op);
-            if (preds != null && preds.size() > 1) {
-                Collections.sort(preds);
-            }
-        } else {
-            //For POJoinGroupSpark, we could not use plan.getPredecessors(op)+ sort to get
-            //the predecessors with correct order, more detail see JoinOptimizerSpark#restructSparkOp
-            preds = ((POJoinGroupSpark) op).getPredecessors();
-        }
-        return preds;
-    }
-
     //get all rdds of predecessors sorted by the OperatorKey
     private List<RDD<Tuple>> sortPredecessorRDDs(Set<OperatorKey> operatorKeysOfAllPreds) {
         List<RDD<Tuple>> predecessorRDDs = Lists.newArrayList();
-//        List<OperatorKey> operatorKeyOfAllPreds = Lists.newArrayList(operatorKeysOfAllPreds);
-//        Collections.sort(operatorKeyOfAllPreds);
-        //We need not sort operatorKeyOfAllPreds any more because operatorKeyOfAllPreds is LinkedHashSet
-        //which provides the order of insertion, before we insert element which is sorted by OperatorKey
-        for (OperatorKey operatorKeyOfAllPred : operatorKeysOfAllPreds) {
+        List<OperatorKey> operatorKeyOfAllPreds = Lists.newArrayList(operatorKeysOfAllPreds);
+        Collections.sort(operatorKeyOfAllPreds);
+        for (OperatorKey operatorKeyOfAllPred : operatorKeyOfAllPreds) {
             predecessorRDDs.add(physicalOpRdds.get(operatorKeyOfAllPred));
         }
         return predecessorRDDs;

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1751847&r1=1751846&r2=1751847&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java Fri Jul  8 02:35:08 2016
@@ -78,7 +78,6 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.spark.converter.FilterConverter;
 import org.apache.pig.backend.hadoop.executionengine.spark.converter.ForEachConverter;
 import org.apache.pig.backend.hadoop.executionengine.spark.converter.GlobalRearrangeConverter;
-import org.apache.pig.backend.hadoop.executionengine.spark.converter.JoinGroupSparkConverter;
 import org.apache.pig.backend.hadoop.executionengine.spark.converter.LimitConverter;
 import org.apache.pig.backend.hadoop.executionengine.spark.converter.LoadConverter;
 import org.apache.pig.backend.hadoop.executionengine.spark.converter.LocalRearrangeConverter;
@@ -95,11 +94,9 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.spark.converter.StreamConverter;
 import org.apache.pig.backend.hadoop.executionengine.spark.converter.UnionConverter;
 import org.apache.pig.backend.hadoop.executionengine.spark.operator.POGlobalRearrangeSpark;
-import org.apache.pig.backend.hadoop.executionengine.spark.operator.POJoinGroupSpark;
 import org.apache.pig.backend.hadoop.executionengine.spark.operator.POReduceBySpark;
 import org.apache.pig.backend.hadoop.executionengine.spark.optimizer.AccumulatorOptimizer;
 import org.apache.pig.backend.hadoop.executionengine.spark.optimizer.CombinerOptimizer;
-import org.apache.pig.backend.hadoop.executionengine.spark.optimizer.JoinGroupOptimizerSpark;
 import org.apache.pig.backend.hadoop.executionengine.spark.optimizer.MultiQueryOptimizerSpark;
 import org.apache.pig.backend.hadoop.executionengine.spark.optimizer.NoopFilterRemover;
 import org.apache.pig.backend.hadoop.executionengine.spark.optimizer.ParallelismSetter;
@@ -117,7 +114,6 @@ import org.apache.pig.impl.plan.Operator
 import org.apache.pig.impl.plan.PlanException;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.util.JarManager;
-import org.apache.pig.impl.util.UDFContext;
 import org.apache.pig.impl.util.Utils;
 import org.apache.pig.tools.pigstats.OutputStats;
 import org.apache.pig.tools.pigstats.PigStats;
@@ -203,7 +199,6 @@ public class SparkLauncher extends Launc
         convertMap.put(POPackage.class, new PackageConverter(confBytes));
         convertMap.put(POLocalRearrange.class, new LocalRearrangeConverter());
         convertMap.put(POGlobalRearrangeSpark.class, new GlobalRearrangeConverter());
-	    convertMap.put(POJoinGroupSpark.class, new JoinGroupSparkConverter(confBytes));
         convertMap.put(POLimit.class, new LimitConverter());
         convertMap.put(PODistinct.class, new DistinctConverter());
         convertMap.put(POUnion.class, new UnionConverter(sparkContext.sc()));
@@ -224,14 +219,9 @@ public class SparkLauncher extends Launc
         new JobGraphBuilder(sparkplan, convertMap, sparkStats, sparkContext, jobMetricsListener, jobGroupID, jobConf, pigContext).visit();
         cleanUpSparkJob(sparkStats);
         sparkStats.finish();
-        resetUDFContext();
         return sparkStats;
     }
 
-    private void resetUDFContext() {
-        UDFContext.getUDFContext().addJobConf(null);
-    }
-
     private void uploadResources(SparkOperPlan sparkPlan) throws IOException {
         addFilesToSparkJob();
         addJarsToSparkJob(sparkPlan);
@@ -283,12 +273,6 @@ public class SparkLauncher extends Launc
             mqOptimizer.visit();
         }
 
-        //since JoinGroupOptimizerSpark modifies the plan and collapses LRA+GLA+PKG into POJoinGroupSpark while
-        //CombinerOptimizer collapses GLA+PKG into ReduceBy, so if JoinGroupOptimizerSpark first, the spark plan will be
-        //changed and not suitable for CombinerOptimizer.More detail see PIG-4797
-        JoinGroupOptimizerSpark joinOptimizer = new JoinGroupOptimizerSpark(plan);
-        joinOptimizer.visit();
-
         if (LOG.isDebugEnabled()) {
             System.out.println("after multiquery optimization:");
             explain(plan, System.out, "text", true);

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/IndexedKey.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/IndexedKey.java?rev=1751847&r1=1751846&r2=1751847&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/IndexedKey.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/IndexedKey.java Fri Jul  8 02:35:08 2016
@@ -19,10 +19,7 @@ package org.apache.pig.backend.hadoop.ex
 
 import java.io.Serializable;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 
 /**
@@ -31,12 +28,9 @@ import org.apache.pig.data.Tuple;
  * either empty (or is a tuple with one or more empty fields). In this case,
  * we must respect the SQL standard as documented in the equals() method.
  */
-public class IndexedKey implements Serializable, Comparable {
-    private static final Log LOG = LogFactory.getLog(IndexedKey.class);
+public class IndexedKey implements Serializable {
     private byte index;
     private Object key;
-    private boolean useSecondaryKey;
-    private boolean[] secondarySortOrder;
 
     public IndexedKey(byte index, Object key) {
         this.index = index;
@@ -146,40 +140,4 @@ public class IndexedKey implements Seria
         }
         return result;
     }
-
-    //firstly compare the index
-    //secondly compare the key
-    @Override
-    public int compareTo(Object o) {
-        IndexedKey that = (IndexedKey) o;
-        int res = index - that.getIndex();
-        if (res > 0) {
-            return 1;
-        } else if (res < 0) {
-            return -1;
-        } else {
-            if (useSecondaryKey) {
-                Tuple thisCompoundKey = (Tuple) key;
-                Tuple thatCompoundKey = (Tuple) that.getKey();
-                try {
-                    Object thisSecondary = thisCompoundKey.get(1);
-                    Object thatSecondaryKey = thatCompoundKey.get(1);
-                    return PigSecondaryKeyComparatorSpark.compareSecondaryKeys(thisSecondary, thatSecondaryKey, secondarySortOrder);
-
-                } catch (ExecException e) {
-                    throw new RuntimeException("IndexedKey#compareTo throws exception ", e);
-                }
-            } else {
-                return DataType.compare(key, that.getKey());
-            }
-        }
-    }
-
-    public void setUseSecondaryKey(boolean useSecondaryKey) {
-        this.useSecondaryKey = useSecondaryKey;
-    }
-
-    public void setSecondarySortOrder(boolean[] secondarySortOrder) {
-        this.secondarySortOrder = secondarySortOrder;
-    }
 }
\ No newline at end of file

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PigSecondaryKeyComparatorSpark.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PigSecondaryKeyComparatorSpark.java?rev=1751847&r1=1751846&r2=1751847&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PigSecondaryKeyComparatorSpark.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PigSecondaryKeyComparatorSpark.java Fri Jul  8 02:35:08 2016
@@ -66,7 +66,7 @@ class PigSecondaryKeyComparatorSpark imp
         }
     }
 
-    public static int compareSecondaryKeys(Object o1, Object o2, boolean[] asc) {
+    private int compareSecondaryKeys(Object o1, Object o2, boolean[] asc) {
         int rc = 0;
         if (o1 != null && o2 != null && o1 instanceof Tuple && o2 instanceof Tuple) {
             // objects are Tuples, we may need to apply sort order inside them

Modified: pig/branches/spark/test/org/apache/pig/test/TestEvalPipelineLocal.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestEvalPipelineLocal.java?rev=1751847&r1=1751846&r2=1751847&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestEvalPipelineLocal.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestEvalPipelineLocal.java Fri Jul  8 02:35:08 2016
@@ -65,7 +65,7 @@ import org.junit.Before;
 import org.junit.Test;
 
 public class TestEvalPipelineLocal {
-
+    
     private PigServer pigServer;
 
     static final int MAX_SIZE = 100000;
@@ -1114,6 +1114,8 @@ public class TestEvalPipelineLocal {
     
     @Test
     public void testSetLocationCalledInFE() throws Exception {
+        // Need to reset it when running multiple testcases
+        UDFContext.getUDFContext().addJobConf(null);
         File f1 = createFile(new String[]{"a","b"});
         pigServer.registerQuery("a = load '" + Util.generateURI(f1.toString(), pigServer.getPigContext())
                 + "' using " + SetLocationTestLoadFunc.class.getName()