You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by xu...@apache.org on 2015/07/28 20:54:46 UTC

svn commit: r1693141 - in /pig/branches/spark: src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/ src/org/apache/pig/backend/hadoop/executionengine/spark/ src/org/apache/pig/backend/hadoop/executionengine/spark/plan/ src/org/apache/...

Author: xuefu
Date: Tue Jul 28 18:54:46 2015
New Revision: 1693141

URL: http://svn.apache.org/r1693141
Log:
PIG-4594: Enable TestMultiQuery in spark mode (Liyun via Xuefu)

Modified:
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperator.java
    pig/branches/spark/src/org/apache/pig/impl/plan/OperatorPlan.java
    pig/branches/spark/test/org/apache/pig/test/TestMultiQuery.java

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java?rev=1693141&r1=1693140&r2=1693141&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java Tue Jul 28 18:54:46 2015
@@ -143,6 +143,18 @@ public class PhysicalPlan extends Operat
         to.setInputs(getPredecessors(to));
     }
 
+    /**
+     * connect from and to and ignore some judgements: like ignoring judge whether from operator supports multiOutputs
+     * and whether to operator supports multiInputs
+     *
+     * @param from
+     * @param to
+     */
+    public void forceConnect(PhysicalOperator from, PhysicalOperator to) throws PlanException {
+        super.forceConnect(from, to);
+        to.setInputs(getPredecessors(to));
+    }
+
     /*public void connect(List<PhysicalOperator> from, PhysicalOperator to) throws IOException{
         if(!to.supportsMultipleInputs()){
             throw new IOException("Invalid Operation on " + to.name() + ". It doesn't support multiple inputs.");

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1693141&r1=1693140&r2=1693141&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java Tue Jul 28 18:54:46 2015
@@ -17,9 +17,6 @@
  */
 package org.apache.pig.backend.hadoop.executionengine.spark;
 
-import com.google.common.base.Joiner;
-import com.google.common.collect.Lists;
-
 import java.io.File;
 import java.io.IOException;
 import java.io.PrintStream;
@@ -43,7 +40,6 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobConf;
-
 import org.apache.pig.PigConfiguration;
 import org.apache.pig.PigConstants;
 import org.apache.pig.PigException;
@@ -67,15 +63,36 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSkewedJoin;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
-import org.apache.pig.backend.hadoop.executionengine.spark.converter.*;
+import org.apache.pig.backend.hadoop.executionengine.spark.converter.CollectedGroupConverter;
+import org.apache.pig.backend.hadoop.executionengine.spark.converter.CounterConverter;
+import org.apache.pig.backend.hadoop.executionengine.spark.converter.DistinctConverter;
+import org.apache.pig.backend.hadoop.executionengine.spark.converter.FRJoinConverter;
+import org.apache.pig.backend.hadoop.executionengine.spark.converter.FilterConverter;
+import org.apache.pig.backend.hadoop.executionengine.spark.converter.ForEachConverter;
+import org.apache.pig.backend.hadoop.executionengine.spark.converter.GlobalRearrangeConverter;
+import org.apache.pig.backend.hadoop.executionengine.spark.converter.LimitConverter;
+import org.apache.pig.backend.hadoop.executionengine.spark.converter.LoadConverter;
+import org.apache.pig.backend.hadoop.executionengine.spark.converter.LocalRearrangeConverter;
+import org.apache.pig.backend.hadoop.executionengine.spark.converter.MergeJoinConverter;
+import org.apache.pig.backend.hadoop.executionengine.spark.converter.PackageConverter;
+import org.apache.pig.backend.hadoop.executionengine.spark.converter.RDDConverter;
+import org.apache.pig.backend.hadoop.executionengine.spark.converter.RankConverter;
+import org.apache.pig.backend.hadoop.executionengine.spark.converter.SkewedJoinConverter;
+import org.apache.pig.backend.hadoop.executionengine.spark.converter.SortConverter;
+import org.apache.pig.backend.hadoop.executionengine.spark.converter.SplitConverter;
+import org.apache.pig.backend.hadoop.executionengine.spark.converter.StoreConverter;
+import org.apache.pig.backend.hadoop.executionengine.spark.converter.StreamConverter;
+import org.apache.pig.backend.hadoop.executionengine.spark.converter.UnionConverter;
 import org.apache.pig.backend.hadoop.executionengine.spark.operator.POGlobalRearrangeSpark;
 import org.apache.pig.backend.hadoop.executionengine.spark.optimizer.AccumulatorOptimizer;
-import org.apache.pig.backend.hadoop.executionengine.spark.optimizer.SecondaryKeyOptimizerSpark;
+import org.apache.pig.backend.hadoop.executionengine.spark.optimizer.MultiQueryOptimizerSpark;
+import org.apache.pig.backend.hadoop.executionengine.spark.optimizer.NoopFilterRemover;
 import org.apache.pig.backend.hadoop.executionengine.spark.optimizer.ParallelismSetter;
+import org.apache.pig.backend.hadoop.executionengine.spark.optimizer.SecondaryKeyOptimizerSpark;
 import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkCompiler;
 import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperPlan;
 import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
@@ -97,7 +114,9 @@ import org.apache.spark.api.java.JavaSpa
 import org.apache.spark.rdd.RDD;
 import org.apache.spark.scheduler.JobLogger;
 import org.apache.spark.scheduler.StatsReportListener;
-import org.apache.spark.SparkException;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
 
 /**
  * Main class that launches pig for Spark
@@ -188,7 +207,7 @@ public class SparkLauncher extends Launc
         return sparkStats;
     }
 
-    private void optimize(PigContext pc, SparkOperPlan plan) throws VisitorException {
+    private void optimize(PigContext pc, SparkOperPlan plan) throws IOException {
         String prop = pc.getProperties().getProperty(PigConfiguration.PIG_EXEC_NO_SECONDARY_KEY);
         if (!pc.inIllustrator && !("true".equals(prop))) {
             SecondaryKeyOptimizerSpark skOptimizer = new SecondaryKeyOptimizerSpark(plan);
@@ -201,6 +220,31 @@ public class SparkLauncher extends Launc
             AccumulatorOptimizer accum = new AccumulatorOptimizer(plan);
             accum.visit();
         }
+
+        // removes the filter(constant(true)) operators introduced by
+        // splits.
+        NoopFilterRemover fRem = new NoopFilterRemover(plan);
+        fRem.visit();
+
+        boolean isMultiQuery =
+                Boolean.valueOf(pc.getProperties().getProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "true"));
+
+        if (LOG.isDebugEnabled()) {
+            System.out.println("before multiquery optimization:");
+            explain(plan, System.out, "text", true);
+        }
+
+        if (isMultiQuery) {
+            // reduces the number of SparkOpers in the Spark plan generated
+            // by multi-query (multi-store) script.
+            MultiQueryOptimizerSpark mqOptimizer = new MultiQueryOptimizerSpark(plan);
+            mqOptimizer.visit();
+        }
+
+        if (LOG.isDebugEnabled()) {
+            System.out.println("after multiquery optimization:");
+            explain(plan, System.out, "text", true);
+        }
     }
 
     /**
@@ -230,7 +274,6 @@ public class SparkLauncher extends Launc
                     + " in this call to getJobIdsForGroup, but got "
                     + unseenJobIDs.size());
         }
-
         seenJobIDs.addAll(unseenJobIDs);
         return unseenJobIDs;
     }
@@ -491,51 +534,41 @@ public class SparkLauncher extends Launc
         List<PhysicalOperator> leafPOs = sparkOperator.physicalPlan.getLeaves();
         boolean isFail = false;
         Exception exception = null;
-        if (leafPOs != null && leafPOs.size() != 1) {
-            throw new IllegalArgumentException(
-                    String.format(
-                            "sparkOperator "
-                                    + ".physicalPlan should have 1 leaf, but  sparkOperator"
-                                    + ".physicalPlan.getLeaves():{} not equals 1, sparkOperator"
-                                    + "sparkOperator:{}",
-                            sparkOperator.physicalPlan.getLeaves().size(),
-                            sparkOperator.name()));
-        }
-
-        PhysicalOperator leafPO = leafPOs.get(0);
-        try {
-            physicalToRDD(sparkOperator.physicalPlan, leafPO, physicalOpRdds,
-                    predecessorRDDs, convertMap);
-            sparkOpRdds.put(sparkOperator.getOperatorKey(),
-                    physicalOpRdds.get(leafPO.getOperatorKey()));
-        } catch (Exception e) {
-            if (e instanceof SparkException) {
-                LOG.info("throw SparkException, error founds when running " +
-                        "rdds in spark");
+        //One SparkOperator may have multiple leaves(POStores) after multiquery feature is enabled
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("sparkOperator.physicalPlan have " + sparkOperator.physicalPlan.getLeaves().size() + " leaves");
+        }
+        for (PhysicalOperator leafPO : leafPOs) {
+            try {
+                physicalToRDD(sparkOperator.physicalPlan, leafPO, physicalOpRdds,
+                        predecessorRDDs, convertMap);
+                sparkOpRdds.put(sparkOperator.getOperatorKey(),
+                        physicalOpRdds.get(leafPO.getOperatorKey()));
+            } catch (Exception e) {
+                LOG.error("throw exception in sparkOperToRDD: ", e);
+                exception = e;
+                isFail = true;
             }
-            exception = e;
-            isFail = true;
         }
 
         List<POStore> poStores = PlanHelper.getPhysicalOperators(
                 sparkOperator.physicalPlan, POStore.class);
-        if (poStores != null && poStores.size() == 1) {
-            POStore poStore = poStores.get(0);
+        Collections.sort(poStores);
+        if (poStores.size() > 0) {
+            int i = 0;
             if (!isFail) {
-                for (int jobID : getJobIDs(seenJobIDs)) {
-                    SparkStatsUtil.waitForJobAddStats(jobID, poStore, sparkOperator,
+                List<Integer> jobIDs = getJobIDs(seenJobIDs);
+                for (POStore poStore : poStores) {
+                    SparkStatsUtil.waitForJobAddStats(jobIDs.get(i++), poStore, sparkOperator,
                             jobMetricsListener, sparkContext, sparkStats, conf);
                 }
             } else {
-                String failJobID = sparkOperator.name().concat("_fail");
-                SparkStatsUtil.addFailJobStats(failJobID, poStore, sparkOperator, sparkStats,
-                        conf, exception);
+                for (POStore poStore : poStores) {
+                    String failJobID = sparkOperator.name().concat("_fail");
+                    SparkStatsUtil.addFailJobStats(failJobID, poStore, sparkOperator, sparkStats,
+                            conf, exception);
+                }
             }
-        } else {
-            LOG.info(String
-                    .format(String.format("sparkOperator:{} does not have POStore or" +
-                                    " sparkOperator has more than 1 POStore. {} is the size of POStore."),
-                            sparkOperator.name(), poStores.size()));
         }
     }
 
@@ -567,24 +600,37 @@ public class SparkLauncher extends Launc
             }
         }
 
-        RDDConverter converter = convertMap.get(physicalOperator.getClass());
-        if (converter == null) {
-            throw new IllegalArgumentException(
-                    "Pig on Spark does not support Physical Operator: " + physicalOperator);
-        }
+        if (physicalOperator instanceof POSplit) {
+            List<PhysicalPlan> successorPlans = ((POSplit) physicalOperator).getPlans();
+            for (PhysicalPlan successPlan : successorPlans) {
+                List<PhysicalOperator> leavesOfSuccessPlan = successPlan.getLeaves();
+                if (leavesOfSuccessPlan.size() != 1) {
+                    LOG.error("the size of leaves of SuccessPlan should be 1");
+                    break;
+                }
+                PhysicalOperator leafOfSuccessPlan = leavesOfSuccessPlan.get(0);
+                physicalToRDD(successPlan, leafOfSuccessPlan, rdds, predecessorRdds, convertMap);
+            }
+        } else {
+            RDDConverter converter = convertMap.get(physicalOperator.getClass());
+            if (converter == null) {
+                throw new IllegalArgumentException(
+                        "Pig on Spark does not support Physical Operator: " + physicalOperator);
+            }
 
-        LOG.info("Converting operator "
-                + physicalOperator.getClass().getSimpleName() + " "
-                + physicalOperator);
-        nextRDD = converter.convert(predecessorRdds, physicalOperator);
+            LOG.info("Converting operator "
+                    + physicalOperator.getClass().getSimpleName() + " "
+                    + physicalOperator);
+            nextRDD = converter.convert(predecessorRdds, physicalOperator);
 
-        if (nextRDD == null) {
-            throw new IllegalArgumentException(
-                    "RDD should not be null after PhysicalOperator: "
-                            + physicalOperator);
-        }
+            if (nextRDD == null) {
+                throw new IllegalArgumentException(
+                        "RDD should not be null after PhysicalOperator: "
+                                + physicalOperator);
+            }
 
-        rdds.put(physicalOperator.getOperatorKey(), nextRDD);
+            rdds.put(physicalOperator.getOperatorKey(), nextRDD);
+        }
     }
 
     @Override

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperator.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperator.java?rev=1693141&r1=1693140&r2=1693141&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperator.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperator.java Tue Jul 28 18:54:46 2015
@@ -245,4 +245,23 @@ public class SparkOperator extends Opera
     public void markLimitAfterSort() {
         feature.set(OPER_FEATURE.LIMIT_AFTER_SORT.ordinal());
     }
+
+    public void copyFeatures(SparkOperator copyFrom, List<OPER_FEATURE> excludeFeatures) {
+        for (OPER_FEATURE opf : OPER_FEATURE.values()) {
+            if (excludeFeatures != null && excludeFeatures.contains(opf)) {
+                continue;
+            }
+            if (copyFrom.feature.get(opf.ordinal())) {
+                feature.set(opf.ordinal());
+            }
+        }
+    }
+
+    public void setRequestedParallelism(int requestedParallelism) {
+        this.requestedParallelism = requestedParallelism;
+    }
+
+    public void setRequestedParallelismByReference(SparkOperator oper) {
+        this.requestedParallelism = oper.requestedParallelism;
+    }
 }

Modified: pig/branches/spark/src/org/apache/pig/impl/plan/OperatorPlan.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/plan/OperatorPlan.java?rev=1693141&r1=1693140&r2=1693141&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/plan/OperatorPlan.java (original)
+++ pig/branches/spark/src/org/apache/pig/impl/plan/OperatorPlan.java Tue Jul 28 18:54:46 2015
@@ -163,6 +163,25 @@ public abstract class OperatorPlan<E ext
     }
 
     /**
+     * connect from and to and ignore some judgements: like ignoring judge whether from operator supports multiOutputs
+     * and whether to operator supports multiInputs
+     *
+     * @param from Operator data will flow from.
+     * @param to   Operator data will flow to.
+     * @throws PlanException if connect from or to which is not in the plan
+     */
+    public void forceConnect(E from, E to) throws PlanException {
+        markDirty();
+
+        // Check that both nodes are in the plan.
+        checkInPlan(from);
+        checkInPlan(to);
+        mFromEdges.put(from, to);
+        mToEdges.put(to, from);
+    }
+
+
+    /**
      * Create an edge between two nodes.  The direction of the edge implies data
      * flow.
      * @param from Operator data will flow from.

Modified: pig/branches/spark/test/org/apache/pig/test/TestMultiQuery.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestMultiQuery.java?rev=1693141&r1=1693140&r2=1693141&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestMultiQuery.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestMultiQuery.java Tue Jul 28 18:54:46 2015
@@ -30,6 +30,7 @@ import org.apache.pig.PigServer;
 import org.apache.pig.backend.executionengine.ExecJob;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -106,33 +107,23 @@ public class TestMultiQuery {
         myPig.registerQuery("E = load 'output1' as (a:int, b:int);");
         Iterator<Tuple> iter = myPig.openIterator("E");
 
-        List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
-                new String[] {
+        String[] expectedResults = new String[]{
                         "(1,2)",
                         "(2,3)"
-                });
+        };
+        Schema s = myPig.dumpSchema("E");
+        Util.checkQueryOutputsAfterSortRecursive(iter, expectedResults, org.apache.pig.newplan.logical.Util.translateSchema(s));
 
-        int counter = 0;
-        while (iter.hasNext()) {
-            assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());
-        }
-        assertEquals(expectedResults.size(), counter);
 
         myPig.registerQuery("E = load 'output2' as (a:int, b:int);");
         iter = myPig.openIterator("E");
 
-        expectedResults = Util.getTuplesFromConstantTupleStrings(
-                new String[] {
+        expectedResults = new String[]{
                         "(2,3)",
                         "(3,4)"
-                });
-
-        counter = 0;
-        while (iter.hasNext()) {
-            assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());
-        }
-
-        assertEquals(expectedResults.size(), counter);
+        };
+        s = myPig.dumpSchema("E");
+        Util.checkQueryOutputsAfterSortRecursive(iter, expectedResults, org.apache.pig.newplan.logical.Util.translateSchema(s));
     }
 
     @Test
@@ -165,20 +156,14 @@ public class TestMultiQuery {
 
         Iterator<Tuple> iter = myPig.openIterator("F");
 
-        List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
-                new String[] {
+        String[] expectedResults = new String[]{
                         "(1,2)",
                         "(2,3)",
                         "(3,5)",
                         "(5,6)"
-                });
-
-        int counter = 0;
-        while (iter.hasNext()) {
-            assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());
-        }
-
-        assertEquals(expectedResults.size(), counter);
+        };
+        Schema s = myPig.dumpSchema("F");
+        Util.checkQueryOutputsAfterSortRecursive(iter, expectedResults, org.apache.pig.newplan.logical.Util.translateSchema(s));
     }
 
     @Test
@@ -299,19 +284,13 @@ public class TestMultiQuery {
 
         Iterator<Tuple> iter = myPig.openIterator("E");
 
-        List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
-                new String[] {
-                        "(1L,'apple',3,1L,'apple',1L,{(1L)})",
-                        "(2L,'orange',4,2L,'orange',2L,{(2L)})",
-                        "(3L,'persimmon',5,3L,'persimmon',3L,{(3L)})"
-                });
-
-        int counter = 0;
-        while (iter.hasNext()) {
-            assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());
-        }
-
-        assertEquals(expectedResults.size(), counter);
+        String[] expectedResults = new String[]{
+                "(1L,apple,3,1L,apple,1L,{(1L)})",
+                "(2L,orange,4,2L,orange,2L,{(2L)})",
+                "(3L,persimmon,5,3L,persimmon,3L,{(3L)})"
+        };
+        Schema s = myPig.dumpSchema("E");
+        Util.checkQueryOutputsAfterSortRecursive(iter, expectedResults, org.apache.pig.newplan.logical.Util.translateSchema(s));
     }
 
     @Test
@@ -345,19 +324,13 @@ public class TestMultiQuery {
 
         Iterator<Tuple> iter = myPig.openIterator("joined_session_info");
 
-        List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
-                new String[] {
-                        "('apple',{},{('apple','jar',1L)})",
-                        "('orange',{},{('orange','box',1L)})",
-                        "('strawberry',{(30,'strawberry','quit','bot')},{})"
-                });
-
-        int counter = 0;
-        while (iter.hasNext()) {
-            assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());
-        }
+        String[] expectedResults = new String[]{
+                "(apple,{},{(apple,jar,1L)})",
+                "(orange,{},{(orange,box,1L)})",
+                "(strawberry,{(30,strawberry,quit,bot)},{})"};
 
-        assertEquals(expectedResults.size(), counter);
+        Schema s = myPig.dumpSchema("joined_session_info");
+        Util.checkQueryOutputsAfterSortRecursive(iter, expectedResults, org.apache.pig.newplan.logical.Util.translateSchema(s));
     }
 
     @Test