You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by xu...@apache.org on 2015/07/28 20:54:46 UTC
svn commit: r1693141 - in /pig/branches/spark:
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/
src/org/apache/pig/backend/hadoop/executionengine/spark/
src/org/apache/pig/backend/hadoop/executionengine/spark/plan/
src/org/apache/...
Author: xuefu
Date: Tue Jul 28 18:54:46 2015
New Revision: 1693141
URL: http://svn.apache.org/r1693141
Log:
PIG-4594: Enable TestMultiQuery in spark mode (Liyun via Xuefu)
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperator.java
pig/branches/spark/src/org/apache/pig/impl/plan/OperatorPlan.java
pig/branches/spark/test/org/apache/pig/test/TestMultiQuery.java
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java?rev=1693141&r1=1693140&r2=1693141&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java Tue Jul 28 18:54:46 2015
@@ -143,6 +143,18 @@ public class PhysicalPlan extends Operat
to.setInputs(getPredecessors(to));
}
+ /**
+ * connect from and to and ignore some judgements: like ignoring judge whether from operator supports multiOutputs
+ * and whether to operator supports multiInputs
+ *
+ * @param from
+ * @param to
+ */
+ public void forceConnect(PhysicalOperator from, PhysicalOperator to) throws PlanException {
+ super.forceConnect(from, to);
+ to.setInputs(getPredecessors(to));
+ }
+
/*public void connect(List<PhysicalOperator> from, PhysicalOperator to) throws IOException{
if(!to.supportsMultipleInputs()){
throw new IOException("Invalid Operation on " + to.name() + ". It doesn't support multiple inputs.");
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1693141&r1=1693140&r2=1693141&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java Tue Jul 28 18:54:46 2015
@@ -17,9 +17,6 @@
*/
package org.apache.pig.backend.hadoop.executionengine.spark;
-import com.google.common.base.Joiner;
-import com.google.common.collect.Lists;
-
import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
@@ -43,7 +40,6 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
-
import org.apache.pig.PigConfiguration;
import org.apache.pig.PigConstants;
import org.apache.pig.PigException;
@@ -67,15 +63,36 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSkewedJoin;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
-import org.apache.pig.backend.hadoop.executionengine.spark.converter.*;
+import org.apache.pig.backend.hadoop.executionengine.spark.converter.CollectedGroupConverter;
+import org.apache.pig.backend.hadoop.executionengine.spark.converter.CounterConverter;
+import org.apache.pig.backend.hadoop.executionengine.spark.converter.DistinctConverter;
+import org.apache.pig.backend.hadoop.executionengine.spark.converter.FRJoinConverter;
+import org.apache.pig.backend.hadoop.executionengine.spark.converter.FilterConverter;
+import org.apache.pig.backend.hadoop.executionengine.spark.converter.ForEachConverter;
+import org.apache.pig.backend.hadoop.executionengine.spark.converter.GlobalRearrangeConverter;
+import org.apache.pig.backend.hadoop.executionengine.spark.converter.LimitConverter;
+import org.apache.pig.backend.hadoop.executionengine.spark.converter.LoadConverter;
+import org.apache.pig.backend.hadoop.executionengine.spark.converter.LocalRearrangeConverter;
+import org.apache.pig.backend.hadoop.executionengine.spark.converter.MergeJoinConverter;
+import org.apache.pig.backend.hadoop.executionengine.spark.converter.PackageConverter;
+import org.apache.pig.backend.hadoop.executionengine.spark.converter.RDDConverter;
+import org.apache.pig.backend.hadoop.executionengine.spark.converter.RankConverter;
+import org.apache.pig.backend.hadoop.executionengine.spark.converter.SkewedJoinConverter;
+import org.apache.pig.backend.hadoop.executionengine.spark.converter.SortConverter;
+import org.apache.pig.backend.hadoop.executionengine.spark.converter.SplitConverter;
+import org.apache.pig.backend.hadoop.executionengine.spark.converter.StoreConverter;
+import org.apache.pig.backend.hadoop.executionengine.spark.converter.StreamConverter;
+import org.apache.pig.backend.hadoop.executionengine.spark.converter.UnionConverter;
import org.apache.pig.backend.hadoop.executionengine.spark.operator.POGlobalRearrangeSpark;
import org.apache.pig.backend.hadoop.executionengine.spark.optimizer.AccumulatorOptimizer;
-import org.apache.pig.backend.hadoop.executionengine.spark.optimizer.SecondaryKeyOptimizerSpark;
+import org.apache.pig.backend.hadoop.executionengine.spark.optimizer.MultiQueryOptimizerSpark;
+import org.apache.pig.backend.hadoop.executionengine.spark.optimizer.NoopFilterRemover;
import org.apache.pig.backend.hadoop.executionengine.spark.optimizer.ParallelismSetter;
+import org.apache.pig.backend.hadoop.executionengine.spark.optimizer.SecondaryKeyOptimizerSpark;
import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkCompiler;
import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperPlan;
import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
@@ -97,7 +114,9 @@ import org.apache.spark.api.java.JavaSpa
import org.apache.spark.rdd.RDD;
import org.apache.spark.scheduler.JobLogger;
import org.apache.spark.scheduler.StatsReportListener;
-import org.apache.spark.SparkException;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
/**
* Main class that launches pig for Spark
@@ -188,7 +207,7 @@ public class SparkLauncher extends Launc
return sparkStats;
}
- private void optimize(PigContext pc, SparkOperPlan plan) throws VisitorException {
+ private void optimize(PigContext pc, SparkOperPlan plan) throws IOException {
String prop = pc.getProperties().getProperty(PigConfiguration.PIG_EXEC_NO_SECONDARY_KEY);
if (!pc.inIllustrator && !("true".equals(prop))) {
SecondaryKeyOptimizerSpark skOptimizer = new SecondaryKeyOptimizerSpark(plan);
@@ -201,6 +220,31 @@ public class SparkLauncher extends Launc
AccumulatorOptimizer accum = new AccumulatorOptimizer(plan);
accum.visit();
}
+
+ // removes the filter(constant(true)) operators introduced by
+ // splits.
+ NoopFilterRemover fRem = new NoopFilterRemover(plan);
+ fRem.visit();
+
+ boolean isMultiQuery =
+ Boolean.valueOf(pc.getProperties().getProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "true"));
+
+ if (LOG.isDebugEnabled()) {
+ System.out.println("before multiquery optimization:");
+ explain(plan, System.out, "text", true);
+ }
+
+ if (isMultiQuery) {
+ // reduces the number of SparkOpers in the Spark plan generated
+ // by multi-query (multi-store) script.
+ MultiQueryOptimizerSpark mqOptimizer = new MultiQueryOptimizerSpark(plan);
+ mqOptimizer.visit();
+ }
+
+ if (LOG.isDebugEnabled()) {
+ System.out.println("after multiquery optimization:");
+ explain(plan, System.out, "text", true);
+ }
}
/**
@@ -230,7 +274,6 @@ public class SparkLauncher extends Launc
+ " in this call to getJobIdsForGroup, but got "
+ unseenJobIDs.size());
}
-
seenJobIDs.addAll(unseenJobIDs);
return unseenJobIDs;
}
@@ -491,51 +534,41 @@ public class SparkLauncher extends Launc
List<PhysicalOperator> leafPOs = sparkOperator.physicalPlan.getLeaves();
boolean isFail = false;
Exception exception = null;
- if (leafPOs != null && leafPOs.size() != 1) {
- throw new IllegalArgumentException(
- String.format(
- "sparkOperator "
- + ".physicalPlan should have 1 leaf, but sparkOperator"
- + ".physicalPlan.getLeaves():{} not equals 1, sparkOperator"
- + "sparkOperator:{}",
- sparkOperator.physicalPlan.getLeaves().size(),
- sparkOperator.name()));
- }
-
- PhysicalOperator leafPO = leafPOs.get(0);
- try {
- physicalToRDD(sparkOperator.physicalPlan, leafPO, physicalOpRdds,
- predecessorRDDs, convertMap);
- sparkOpRdds.put(sparkOperator.getOperatorKey(),
- physicalOpRdds.get(leafPO.getOperatorKey()));
- } catch (Exception e) {
- if (e instanceof SparkException) {
- LOG.info("throw SparkException, error founds when running " +
- "rdds in spark");
+ //One SparkOperator may have multiple leaves(POStores) after multiquery feature is enabled
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("sparkOperator.physicalPlan have " + sparkOperator.physicalPlan.getLeaves().size() + " leaves");
+ }
+ for (PhysicalOperator leafPO : leafPOs) {
+ try {
+ physicalToRDD(sparkOperator.physicalPlan, leafPO, physicalOpRdds,
+ predecessorRDDs, convertMap);
+ sparkOpRdds.put(sparkOperator.getOperatorKey(),
+ physicalOpRdds.get(leafPO.getOperatorKey()));
+ } catch (Exception e) {
+ LOG.error("throw exception in sparkOperToRDD: ", e);
+ exception = e;
+ isFail = true;
}
- exception = e;
- isFail = true;
}
List<POStore> poStores = PlanHelper.getPhysicalOperators(
sparkOperator.physicalPlan, POStore.class);
- if (poStores != null && poStores.size() == 1) {
- POStore poStore = poStores.get(0);
+ Collections.sort(poStores);
+ if (poStores.size() > 0) {
+ int i = 0;
if (!isFail) {
- for (int jobID : getJobIDs(seenJobIDs)) {
- SparkStatsUtil.waitForJobAddStats(jobID, poStore, sparkOperator,
+ List<Integer> jobIDs = getJobIDs(seenJobIDs);
+ for (POStore poStore : poStores) {
+ SparkStatsUtil.waitForJobAddStats(jobIDs.get(i++), poStore, sparkOperator,
jobMetricsListener, sparkContext, sparkStats, conf);
}
} else {
- String failJobID = sparkOperator.name().concat("_fail");
- SparkStatsUtil.addFailJobStats(failJobID, poStore, sparkOperator, sparkStats,
- conf, exception);
+ for (POStore poStore : poStores) {
+ String failJobID = sparkOperator.name().concat("_fail");
+ SparkStatsUtil.addFailJobStats(failJobID, poStore, sparkOperator, sparkStats,
+ conf, exception);
+ }
}
- } else {
- LOG.info(String
- .format(String.format("sparkOperator:{} does not have POStore or" +
- " sparkOperator has more than 1 POStore. {} is the size of POStore."),
- sparkOperator.name(), poStores.size()));
}
}
@@ -567,24 +600,37 @@ public class SparkLauncher extends Launc
}
}
- RDDConverter converter = convertMap.get(physicalOperator.getClass());
- if (converter == null) {
- throw new IllegalArgumentException(
- "Pig on Spark does not support Physical Operator: " + physicalOperator);
- }
+ if (physicalOperator instanceof POSplit) {
+ List<PhysicalPlan> successorPlans = ((POSplit) physicalOperator).getPlans();
+ for (PhysicalPlan successPlan : successorPlans) {
+ List<PhysicalOperator> leavesOfSuccessPlan = successPlan.getLeaves();
+ if (leavesOfSuccessPlan.size() != 1) {
+ LOG.error("the size of leaves of SuccessPlan should be 1");
+ break;
+ }
+ PhysicalOperator leafOfSuccessPlan = leavesOfSuccessPlan.get(0);
+ physicalToRDD(successPlan, leafOfSuccessPlan, rdds, predecessorRdds, convertMap);
+ }
+ } else {
+ RDDConverter converter = convertMap.get(physicalOperator.getClass());
+ if (converter == null) {
+ throw new IllegalArgumentException(
+ "Pig on Spark does not support Physical Operator: " + physicalOperator);
+ }
- LOG.info("Converting operator "
- + physicalOperator.getClass().getSimpleName() + " "
- + physicalOperator);
- nextRDD = converter.convert(predecessorRdds, physicalOperator);
+ LOG.info("Converting operator "
+ + physicalOperator.getClass().getSimpleName() + " "
+ + physicalOperator);
+ nextRDD = converter.convert(predecessorRdds, physicalOperator);
- if (nextRDD == null) {
- throw new IllegalArgumentException(
- "RDD should not be null after PhysicalOperator: "
- + physicalOperator);
- }
+ if (nextRDD == null) {
+ throw new IllegalArgumentException(
+ "RDD should not be null after PhysicalOperator: "
+ + physicalOperator);
+ }
- rdds.put(physicalOperator.getOperatorKey(), nextRDD);
+ rdds.put(physicalOperator.getOperatorKey(), nextRDD);
+ }
}
@Override
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperator.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperator.java?rev=1693141&r1=1693140&r2=1693141&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperator.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperator.java Tue Jul 28 18:54:46 2015
@@ -245,4 +245,23 @@ public class SparkOperator extends Opera
public void markLimitAfterSort() {
feature.set(OPER_FEATURE.LIMIT_AFTER_SORT.ordinal());
}
+
+ public void copyFeatures(SparkOperator copyFrom, List<OPER_FEATURE> excludeFeatures) {
+ for (OPER_FEATURE opf : OPER_FEATURE.values()) {
+ if (excludeFeatures != null && excludeFeatures.contains(opf)) {
+ continue;
+ }
+ if (copyFrom.feature.get(opf.ordinal())) {
+ feature.set(opf.ordinal());
+ }
+ }
+ }
+
+ public void setRequestedParallelism(int requestedParallelism) {
+ this.requestedParallelism = requestedParallelism;
+ }
+
+ public void setRequestedParallelismByReference(SparkOperator oper) {
+ this.requestedParallelism = oper.requestedParallelism;
+ }
}
Modified: pig/branches/spark/src/org/apache/pig/impl/plan/OperatorPlan.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/plan/OperatorPlan.java?rev=1693141&r1=1693140&r2=1693141&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/plan/OperatorPlan.java (original)
+++ pig/branches/spark/src/org/apache/pig/impl/plan/OperatorPlan.java Tue Jul 28 18:54:46 2015
@@ -163,6 +163,25 @@ public abstract class OperatorPlan<E ext
}
/**
+ * connect from and to and ignore some judgements: like ignoring judge whether from operator supports multiOutputs
+ * and whether to operator supports multiInputs
+ *
+ * @param from Operator data will flow from.
+ * @param to Operator data will flow to.
+ * @throws PlanException if connect from or to which is not in the plan
+ */
+ public void forceConnect(E from, E to) throws PlanException {
+ markDirty();
+
+ // Check that both nodes are in the plan.
+ checkInPlan(from);
+ checkInPlan(to);
+ mFromEdges.put(from, to);
+ mToEdges.put(to, from);
+ }
+
+
+ /**
* Create an edge between two nodes. The direction of the edge implies data
* flow.
* @param from Operator data will flow from.
Modified: pig/branches/spark/test/org/apache/pig/test/TestMultiQuery.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestMultiQuery.java?rev=1693141&r1=1693140&r2=1693141&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestMultiQuery.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestMultiQuery.java Tue Jul 28 18:54:46 2015
@@ -30,6 +30,7 @@ import org.apache.pig.PigServer;
import org.apache.pig.backend.executionengine.ExecJob;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
@@ -106,33 +107,23 @@ public class TestMultiQuery {
myPig.registerQuery("E = load 'output1' as (a:int, b:int);");
Iterator<Tuple> iter = myPig.openIterator("E");
- List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
- new String[] {
+ String[] expectedResults = new String[]{
"(1,2)",
"(2,3)"
- });
+ };
+ Schema s = myPig.dumpSchema("E");
+ Util.checkQueryOutputsAfterSortRecursive(iter, expectedResults, org.apache.pig.newplan.logical.Util.translateSchema(s));
- int counter = 0;
- while (iter.hasNext()) {
- assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());
- }
- assertEquals(expectedResults.size(), counter);
myPig.registerQuery("E = load 'output2' as (a:int, b:int);");
iter = myPig.openIterator("E");
- expectedResults = Util.getTuplesFromConstantTupleStrings(
- new String[] {
+ expectedResults = new String[]{
"(2,3)",
"(3,4)"
- });
-
- counter = 0;
- while (iter.hasNext()) {
- assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());
- }
-
- assertEquals(expectedResults.size(), counter);
+ };
+ s = myPig.dumpSchema("E");
+ Util.checkQueryOutputsAfterSortRecursive(iter, expectedResults, org.apache.pig.newplan.logical.Util.translateSchema(s));
}
@Test
@@ -165,20 +156,14 @@ public class TestMultiQuery {
Iterator<Tuple> iter = myPig.openIterator("F");
- List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
- new String[] {
+ String[] expectedResults = new String[]{
"(1,2)",
"(2,3)",
"(3,5)",
"(5,6)"
- });
-
- int counter = 0;
- while (iter.hasNext()) {
- assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());
- }
-
- assertEquals(expectedResults.size(), counter);
+ };
+ Schema s = myPig.dumpSchema("F");
+ Util.checkQueryOutputsAfterSortRecursive(iter, expectedResults, org.apache.pig.newplan.logical.Util.translateSchema(s));
}
@Test
@@ -299,19 +284,13 @@ public class TestMultiQuery {
Iterator<Tuple> iter = myPig.openIterator("E");
- List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
- new String[] {
- "(1L,'apple',3,1L,'apple',1L,{(1L)})",
- "(2L,'orange',4,2L,'orange',2L,{(2L)})",
- "(3L,'persimmon',5,3L,'persimmon',3L,{(3L)})"
- });
-
- int counter = 0;
- while (iter.hasNext()) {
- assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());
- }
-
- assertEquals(expectedResults.size(), counter);
+ String[] expectedResults = new String[]{
+ "(1L,apple,3,1L,apple,1L,{(1L)})",
+ "(2L,orange,4,2L,orange,2L,{(2L)})",
+ "(3L,persimmon,5,3L,persimmon,3L,{(3L)})"
+ };
+ Schema s = myPig.dumpSchema("E");
+ Util.checkQueryOutputsAfterSortRecursive(iter, expectedResults, org.apache.pig.newplan.logical.Util.translateSchema(s));
}
@Test
@@ -345,19 +324,13 @@ public class TestMultiQuery {
Iterator<Tuple> iter = myPig.openIterator("joined_session_info");
- List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
- new String[] {
- "('apple',{},{('apple','jar',1L)})",
- "('orange',{},{('orange','box',1L)})",
- "('strawberry',{(30,'strawberry','quit','bot')},{})"
- });
-
- int counter = 0;
- while (iter.hasNext()) {
- assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());
- }
+ String[] expectedResults = new String[]{
+ "(apple,{},{(apple,jar,1L)})",
+ "(orange,{},{(orange,box,1L)})",
+ "(strawberry,{(30,strawberry,quit,bot)},{})"};
- assertEquals(expectedResults.size(), counter);
+ Schema s = myPig.dumpSchema("joined_session_info");
+ Util.checkQueryOutputsAfterSortRecursive(iter, expectedResults, org.apache.pig.newplan.logical.Util.translateSchema(s));
}
@Test