You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by xu...@apache.org on 2016/07/08 02:35:08 UTC
svn commit: r1751847 - in /pig/branches/spark:
src/org/apache/pig/backend/hadoop/executionengine/spark/
src/org/apache/pig/backend/hadoop/executionengine/spark/converter/
src/org/apache/pig/backend/hadoop/executionengine/spark/operator/
src/org/apache/...
Author: xuefu
Date: Fri Jul 8 02:35:08 2016
New Revision: 1751847
URL: http://svn.apache.org/viewvc?rev=1751847&view=rev
Log:
Revert PIG-4797 and PIG-4944
Removed:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/JoinGroupSparkConverter.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POJoinGroupSpark.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/JoinGroupOptimizerSpark.java
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/IndexedKey.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PigSecondaryKeyComparatorSpark.java
pig/branches/spark/test/org/apache/pig/test/TestEvalPipelineLocal.java
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java?rev=1751847&r1=1751846&r2=1751847&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java Fri Jul 8 02:35:08 2016
@@ -23,8 +23,6 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -50,7 +48,6 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
import org.apache.pig.backend.hadoop.executionengine.spark.converter.RDDConverter;
import org.apache.pig.backend.hadoop.executionengine.spark.operator.NativeSparkOperator;
-import org.apache.pig.backend.hadoop.executionengine.spark.operator.POJoinGroupSpark;
import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOpPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperPlan;
import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
@@ -241,8 +238,13 @@ public class JobGraphBuilder extends Spa
Set<OperatorKey> predsFromPreviousSparkOper)
throws IOException {
RDD<Tuple> nextRDD = null;
- List<PhysicalOperator> predecessorsOfCurrentPhysicalOp = getPredecessors(plan, physicalOperator);
- Set<OperatorKey> operatorKeysOfAllPreds = new LinkedHashSet<OperatorKey>();
+ List<PhysicalOperator> predecessorsOfCurrentPhysicalOp = plan
+ .getPredecessors(physicalOperator);
+ if (predecessorsOfCurrentPhysicalOp != null && predecessorsOfCurrentPhysicalOp.size() > 1) {
+ Collections.sort(predecessorsOfCurrentPhysicalOp);
+ }
+
+ Set<OperatorKey> operatorKeysOfAllPreds = new HashSet<OperatorKey>();
addPredsFromPrevoiousSparkOp(sparkOperator, physicalOperator, operatorKeysOfAllPreds);
if (predecessorsOfCurrentPhysicalOp != null) {
for (PhysicalOperator predecessor : predecessorsOfCurrentPhysicalOp) {
@@ -294,29 +296,12 @@ public class JobGraphBuilder extends Spa
}
}
- private List<PhysicalOperator> getPredecessors(PhysicalPlan plan, PhysicalOperator op) {
- List preds = null;
- if (!(op instanceof POJoinGroupSpark)) {
- preds = plan.getPredecessors(op);
- if (preds != null && preds.size() > 1) {
- Collections.sort(preds);
- }
- } else {
- //For POJoinGroupSpark, we could not use plan.getPredecessors(op)+ sort to get
- //the predecessors with correct order, more detail see JoinOptimizerSpark#restructSparkOp
- preds = ((POJoinGroupSpark) op).getPredecessors();
- }
- return preds;
- }
-
//get all rdds of predecessors sorted by the OperatorKey
private List<RDD<Tuple>> sortPredecessorRDDs(Set<OperatorKey> operatorKeysOfAllPreds) {
List<RDD<Tuple>> predecessorRDDs = Lists.newArrayList();
-// List<OperatorKey> operatorKeyOfAllPreds = Lists.newArrayList(operatorKeysOfAllPreds);
-// Collections.sort(operatorKeyOfAllPreds);
- //We need not sort operatorKeyOfAllPreds any more because operatorKeyOfAllPreds is LinkedHashSet
- //which provides the order of insertion, before we insert element which is sorted by OperatorKey
- for (OperatorKey operatorKeyOfAllPred : operatorKeysOfAllPreds) {
+ List<OperatorKey> operatorKeyOfAllPreds = Lists.newArrayList(operatorKeysOfAllPreds);
+ Collections.sort(operatorKeyOfAllPreds);
+ for (OperatorKey operatorKeyOfAllPred : operatorKeyOfAllPreds) {
predecessorRDDs.add(physicalOpRdds.get(operatorKeyOfAllPred));
}
return predecessorRDDs;
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1751847&r1=1751846&r2=1751847&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java Fri Jul 8 02:35:08 2016
@@ -78,7 +78,6 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.spark.converter.FilterConverter;
import org.apache.pig.backend.hadoop.executionengine.spark.converter.ForEachConverter;
import org.apache.pig.backend.hadoop.executionengine.spark.converter.GlobalRearrangeConverter;
-import org.apache.pig.backend.hadoop.executionengine.spark.converter.JoinGroupSparkConverter;
import org.apache.pig.backend.hadoop.executionengine.spark.converter.LimitConverter;
import org.apache.pig.backend.hadoop.executionengine.spark.converter.LoadConverter;
import org.apache.pig.backend.hadoop.executionengine.spark.converter.LocalRearrangeConverter;
@@ -95,11 +94,9 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.spark.converter.StreamConverter;
import org.apache.pig.backend.hadoop.executionengine.spark.converter.UnionConverter;
import org.apache.pig.backend.hadoop.executionengine.spark.operator.POGlobalRearrangeSpark;
-import org.apache.pig.backend.hadoop.executionengine.spark.operator.POJoinGroupSpark;
import org.apache.pig.backend.hadoop.executionengine.spark.operator.POReduceBySpark;
import org.apache.pig.backend.hadoop.executionengine.spark.optimizer.AccumulatorOptimizer;
import org.apache.pig.backend.hadoop.executionengine.spark.optimizer.CombinerOptimizer;
-import org.apache.pig.backend.hadoop.executionengine.spark.optimizer.JoinGroupOptimizerSpark;
import org.apache.pig.backend.hadoop.executionengine.spark.optimizer.MultiQueryOptimizerSpark;
import org.apache.pig.backend.hadoop.executionengine.spark.optimizer.NoopFilterRemover;
import org.apache.pig.backend.hadoop.executionengine.spark.optimizer.ParallelismSetter;
@@ -117,7 +114,6 @@ import org.apache.pig.impl.plan.Operator
import org.apache.pig.impl.plan.PlanException;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.util.JarManager;
-import org.apache.pig.impl.util.UDFContext;
import org.apache.pig.impl.util.Utils;
import org.apache.pig.tools.pigstats.OutputStats;
import org.apache.pig.tools.pigstats.PigStats;
@@ -203,7 +199,6 @@ public class SparkLauncher extends Launc
convertMap.put(POPackage.class, new PackageConverter(confBytes));
convertMap.put(POLocalRearrange.class, new LocalRearrangeConverter());
convertMap.put(POGlobalRearrangeSpark.class, new GlobalRearrangeConverter());
- convertMap.put(POJoinGroupSpark.class, new JoinGroupSparkConverter(confBytes));
convertMap.put(POLimit.class, new LimitConverter());
convertMap.put(PODistinct.class, new DistinctConverter());
convertMap.put(POUnion.class, new UnionConverter(sparkContext.sc()));
@@ -224,14 +219,9 @@ public class SparkLauncher extends Launc
new JobGraphBuilder(sparkplan, convertMap, sparkStats, sparkContext, jobMetricsListener, jobGroupID, jobConf, pigContext).visit();
cleanUpSparkJob(sparkStats);
sparkStats.finish();
- resetUDFContext();
return sparkStats;
}
- private void resetUDFContext() {
- UDFContext.getUDFContext().addJobConf(null);
- }
-
private void uploadResources(SparkOperPlan sparkPlan) throws IOException {
addFilesToSparkJob();
addJarsToSparkJob(sparkPlan);
@@ -283,12 +273,6 @@ public class SparkLauncher extends Launc
mqOptimizer.visit();
}
- //since JoinGroupOptimizerSpark modifies the plan and collapses LRA+GLA+PKG into POJoinGroupSpark while
- //CombinerOptimizer collapses GLA+PKG into ReduceBy, so if JoinGroupOptimizerSpark first, the spark plan will be
- //changed and not suitable for CombinerOptimizer.More detail see PIG-4797
- JoinGroupOptimizerSpark joinOptimizer = new JoinGroupOptimizerSpark(plan);
- joinOptimizer.visit();
-
if (LOG.isDebugEnabled()) {
System.out.println("after multiquery optimization:");
explain(plan, System.out, "text", true);
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/IndexedKey.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/IndexedKey.java?rev=1751847&r1=1751846&r2=1751847&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/IndexedKey.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/IndexedKey.java Fri Jul 8 02:35:08 2016
@@ -19,10 +19,7 @@ package org.apache.pig.backend.hadoop.ex
import java.io.Serializable;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
/**
@@ -31,12 +28,9 @@ import org.apache.pig.data.Tuple;
* either empty (or is a tuple with one or more empty fields). In this case,
* we must respect the SQL standard as documented in the equals() method.
*/
-public class IndexedKey implements Serializable, Comparable {
- private static final Log LOG = LogFactory.getLog(IndexedKey.class);
+public class IndexedKey implements Serializable {
private byte index;
private Object key;
- private boolean useSecondaryKey;
- private boolean[] secondarySortOrder;
public IndexedKey(byte index, Object key) {
this.index = index;
@@ -146,40 +140,4 @@ public class IndexedKey implements Seria
}
return result;
}
-
- //firstly compare the index
- //secondly compare the key
- @Override
- public int compareTo(Object o) {
- IndexedKey that = (IndexedKey) o;
- int res = index - that.getIndex();
- if (res > 0) {
- return 1;
- } else if (res < 0) {
- return -1;
- } else {
- if (useSecondaryKey) {
- Tuple thisCompoundKey = (Tuple) key;
- Tuple thatCompoundKey = (Tuple) that.getKey();
- try {
- Object thisSecondary = thisCompoundKey.get(1);
- Object thatSecondaryKey = thatCompoundKey.get(1);
- return PigSecondaryKeyComparatorSpark.compareSecondaryKeys(thisSecondary, thatSecondaryKey, secondarySortOrder);
-
- } catch (ExecException e) {
- throw new RuntimeException("IndexedKey#compareTo throws exception ", e);
- }
- } else {
- return DataType.compare(key, that.getKey());
- }
- }
- }
-
- public void setUseSecondaryKey(boolean useSecondaryKey) {
- this.useSecondaryKey = useSecondaryKey;
- }
-
- public void setSecondarySortOrder(boolean[] secondarySortOrder) {
- this.secondarySortOrder = secondarySortOrder;
- }
}
\ No newline at end of file
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PigSecondaryKeyComparatorSpark.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PigSecondaryKeyComparatorSpark.java?rev=1751847&r1=1751846&r2=1751847&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PigSecondaryKeyComparatorSpark.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PigSecondaryKeyComparatorSpark.java Fri Jul 8 02:35:08 2016
@@ -66,7 +66,7 @@ class PigSecondaryKeyComparatorSpark imp
}
}
- public static int compareSecondaryKeys(Object o1, Object o2, boolean[] asc) {
+ private int compareSecondaryKeys(Object o1, Object o2, boolean[] asc) {
int rc = 0;
if (o1 != null && o2 != null && o1 instanceof Tuple && o2 instanceof Tuple) {
// objects are Tuples, we may need to apply sort order inside them
Modified: pig/branches/spark/test/org/apache/pig/test/TestEvalPipelineLocal.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestEvalPipelineLocal.java?rev=1751847&r1=1751846&r2=1751847&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestEvalPipelineLocal.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestEvalPipelineLocal.java Fri Jul 8 02:35:08 2016
@@ -65,7 +65,7 @@ import org.junit.Before;
import org.junit.Test;
public class TestEvalPipelineLocal {
-
+
private PigServer pigServer;
static final int MAX_SIZE = 100000;
@@ -1114,6 +1114,8 @@ public class TestEvalPipelineLocal {
@Test
public void testSetLocationCalledInFE() throws Exception {
+ // Need to reset it when running multiple testcases
+ UDFContext.getUDFContext().addJobConf(null);
File f1 = createFile(new String[]{"a","b"});
pigServer.registerQuery("a = load '" + Util.generateURI(f1.toString(), pigServer.getPigContext())
+ "' using " + SetLocationTestLoadFunc.class.getName()