You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by xu...@apache.org on 2015/05/15 15:00:03 UTC
svn commit: r1679557 - in /pig/branches/spark:
src/org/apache/pig/backend/hadoop/executionengine/spark/
src/org/apache/pig/backend/hadoop/executionengine/spark/converter/
src/org/apache/pig/backend/hadoop/executionengine/spark/operator/
src/org/apache/...
Author: xuefu
Date: Fri May 15 13:00:02 2015
New Revision: 1679557
URL: http://svn.apache.org/r1679557
Log:
PIG-4504: Enable Secondary key sort feature in spark mode (Liyun via Xuefu)
Added:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POGlobalRearrangeSpark.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/SecondaryKeyOptimizerSpark.java
pig/branches/spark/test/org/apache/pig/spark/
pig/branches/spark/test/org/apache/pig/spark/TestSecondarySortSpark.java
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperator.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPrinter.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/AccumulatorOptimizerUtil.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/SecondaryKeyOptimizerUtil.java
pig/branches/spark/src/org/apache/pig/impl/plan/OperatorPlan.java
pig/branches/spark/test/org/apache/pig/test/TestSecondarySort.java
pig/branches/spark/test/spark-tests
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1679557&r1=1679556&r2=1679557&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java Fri May 15 13:00:02 2015
@@ -42,6 +42,8 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
+
+import org.apache.pig.PigConfiguration;
import org.apache.pig.PigConstants;
import org.apache.pig.PigException;
import org.apache.pig.backend.BackendException;
@@ -55,7 +57,6 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POGlobalRearrange;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
@@ -87,7 +88,9 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.spark.converter.StoreConverter;
import org.apache.pig.backend.hadoop.executionengine.spark.converter.StreamConverter;
import org.apache.pig.backend.hadoop.executionengine.spark.converter.UnionConverter;
+import org.apache.pig.backend.hadoop.executionengine.spark.operator.POGlobalRearrangeSpark;
import org.apache.pig.backend.hadoop.executionengine.spark.optimizer.AccumulatorOptimizer;
+import org.apache.pig.backend.hadoop.executionengine.spark.optimizer.SecondaryKeyOptimizerSpark;
import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkCompiler;
import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperPlan;
import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
@@ -173,9 +176,9 @@ public class SparkLauncher extends Launc
convertMap.put(POFilter.class, new FilterConverter());
convertMap.put(POPackage.class, new PackageConverter(confBytes));
convertMap.put(POLocalRearrange.class, new LocalRearrangeConverter());
- convertMap.put(POGlobalRearrange.class, new GlobalRearrangeConverter());
- convertMap.put(POLimit.class, new LimitConverter());
- convertMap.put(PODistinct.class, new DistinctConverter());
+ convertMap.put(POGlobalRearrangeSpark.class, new GlobalRearrangeConverter());
+ convertMap.put(POLimit.class, new LimitConverter());
+ convertMap.put(PODistinct.class, new DistinctConverter());
convertMap.put(POUnion.class, new UnionConverter(sparkContext.sc()));
convertMap.put(POSort.class, new SortConverter());
convertMap.put(POSplit.class, new SplitConverter());
@@ -193,14 +196,20 @@ public class SparkLauncher extends Launc
return sparkStats;
}
- private void optimize(PigContext pc, SparkOperPlan plan) throws VisitorException {
- boolean isAccumulator =
- Boolean.valueOf(pc.getProperties().getProperty("opt.accumulator","true"));
- if (isAccumulator) {
- AccumulatorOptimizer accumulatorOptimizer = new AccumulatorOptimizer(plan);
- accumulatorOptimizer.visit();
- }
- }
+ private void optimize(PigContext pc, SparkOperPlan plan) throws VisitorException {
+ String prop = pc.getProperties().getProperty(PigConfiguration.PIG_EXEC_NO_SECONDARY_KEY);
+ if (!pc.inIllustrator && !("true".equals(prop))) {
+ SecondaryKeyOptimizerSpark skOptimizer = new SecondaryKeyOptimizerSpark(plan);
+ skOptimizer.visit();
+ }
+
+ boolean isAccum =
+ Boolean.valueOf(pc.getProperties().getProperty("opt.accumulator", "true"));
+ if (isAccum) {
+ AccumulatorOptimizer accum = new AccumulatorOptimizer(plan);
+ accum.visit();
+ }
+ }
/**
* In Spark, currently only async actions return job id. There is no async
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java?rev=1679557&r1=1679556&r2=1679557&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java Fri May 15 13:00:02 2015
@@ -20,20 +20,23 @@ package org.apache.pig.backend.hadoop.ex
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
+import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POGlobalRearrange;
import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
+import org.apache.pig.backend.hadoop.executionengine.spark.operator.POGlobalRearrangeSpark;
+import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.spark.HashPartitioner;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.rdd.CoGroupedRDD;
import org.apache.spark.rdd.RDD;
@@ -41,26 +44,18 @@ import scala.Product2;
import scala.Tuple2;
import scala.collection.JavaConversions;
import scala.collection.Seq;
-//import scala.reflect.ClassManifest;
+import scala.runtime.AbstractFunction1;
@SuppressWarnings({ "serial" })
public class GlobalRearrangeConverter implements
- POConverter<Tuple, Tuple, POGlobalRearrange> {
+ POConverter<Tuple, Tuple, POGlobalRearrangeSpark> {
private static final Log LOG = LogFactory
.getLog(GlobalRearrangeConverter.class);
private static final TupleFactory tf = TupleFactory.getInstance();
-
- // GROUP FUNCTIONS
- private static final ToKeyValueFunction TO_KEY_VALUE_FUNCTION = new ToKeyValueFunction();
- private static final GetKeyFunction GET_KEY_FUNCTION = new GetKeyFunction();
- // COGROUP FUNCTIONS
- private static final GroupTupleFunction GROUP_TUPLE_FUNCTION = new GroupTupleFunction();
- private static final ToGroupKeyValueFunction TO_GROUP_KEY_VALUE_FUNCTION = new ToGroupKeyValueFunction();
-
@Override
public RDD<Tuple> convert(List<RDD<Tuple>> predecessors,
- POGlobalRearrange physicalOperator) throws IOException {
+ POGlobalRearrangeSpark physicalOperator) throws IOException {
SparkUtil.assertPredecessorSizeGreaterThan(predecessors,
physicalOperator, 0);
int parallelism = SparkUtil.getParallelism(predecessors,
@@ -74,15 +69,32 @@ public class GlobalRearrangeConverter im
if (predecessors.size() == 1) {
// GROUP
- JavaRDD<Tuple> jrdd = predecessors.get(0).toJavaRDD();
- JavaPairRDD<Object, Iterable<Tuple>> prdd = jrdd.groupBy(GET_KEY_FUNCTION, parallelism);
- JavaRDD<Tuple> jrdd2 = prdd.map(GROUP_TUPLE_FUNCTION);
+ JavaPairRDD<Object, Iterable<Tuple>> prdd = null;
+ if (physicalOperator.isUseSecondaryKey()) {
+ RDD<Tuple> rdd = predecessors.get(0);
+ RDD<Tuple2<Tuple, Object>> rddPair = rdd.map(new ToKeyNullValueFunction(),
+ SparkUtil.<Tuple, Object>getTuple2Manifest());
+
+ JavaPairRDD<Tuple, Object> pairRDD = new JavaPairRDD<Tuple, Object>(rddPair,
+ SparkUtil.getManifest(Tuple.class),
+ SparkUtil.getManifest(Object.class));
+
+ //first sort the tuple by secondary key if enable useSecondaryKey sort
+ JavaPairRDD<Tuple, Object> sorted = pairRDD.repartitionAndSortWithinPartitions(new HashPartitioner(parallelism), new PigSecondaryKeyComparatorSpark(physicalOperator.getSecondarySortOrder()));
+ JavaRDD<Tuple> mapped = sorted.mapPartitions(new ToValueFunction());
+ prdd = mapped.groupBy(new GetKeyFunction(physicalOperator), parallelism);
+ } else {
+ JavaRDD<Tuple> jrdd = predecessors.get(0).toJavaRDD();
+ prdd = jrdd.groupBy(new GetKeyFunction(physicalOperator), parallelism);
+ }
+
+ JavaRDD<Tuple> jrdd2 = prdd.map(new GroupTupleFunction(physicalOperator));
return jrdd2.rdd();
} else {
List<RDD<Tuple2<Object, Tuple>>> rddPairs = new ArrayList<RDD<Tuple2<Object, Tuple>>>();
for (RDD<Tuple> rdd : predecessors) {
JavaRDD<Tuple> jrdd = JavaRDD.fromRDD(rdd, SparkUtil.getManifest(Tuple.class));
- JavaRDD<Tuple2<Object, Tuple>> rddPair = jrdd.map(TO_KEY_VALUE_FUNCTION);
+ JavaRDD<Tuple2<Object, Tuple>> rddPair = jrdd.map(new ToKeyValueFunction());
rddPairs.add(rddPair.rdd());
}
@@ -95,18 +107,155 @@ public class GlobalRearrangeConverter im
RDD<Tuple2<Object, Seq<Seq<Tuple>>>> rdd =
(RDD<Tuple2<Object, Seq<Seq<Tuple>>>>) (Object) coGroupedRDD;
- return rdd.toJavaRDD().map(TO_GROUP_KEY_VALUE_FUNCTION).rdd();
+ return rdd.toJavaRDD().map(new ToGroupKeyValueFunction()).rdd();
+ }
+ }
+
+ private static class ToValueFunction implements
+ FlatMapFunction<Iterator<Tuple2<Tuple, Object>>, Tuple>, Serializable {
+
+ private class Tuple2TransformIterable implements Iterable<Tuple> {
+
+ Iterator<Tuple2<Tuple, Object>> in;
+
+ Tuple2TransformIterable(Iterator<Tuple2<Tuple, Object>> input) {
+ in = input;
+ }
+
+ public Iterator<Tuple> iterator() {
+ return new IteratorTransform<Tuple2<Tuple, Object>, Tuple>(in) {
+ @Override
+ protected Tuple transform(Tuple2<Tuple, Object> next) {
+ return next._1();
+ }
+ };
+ }
+ }
+
+ @Override
+ public Iterable<Tuple> call(Iterator<Tuple2<Tuple, Object>> input) {
+ return new Tuple2TransformIterable(input);
+ }
+ }
+
+ private static class ToKeyNullValueFunction extends
+ AbstractFunction1<Tuple, Tuple2<Tuple, Object>> implements
+ Serializable {
+
+ @Override
+ public Tuple2<Tuple, Object> apply(Tuple t) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Sort ToKeyValueFunction in " + t);
+ }
+ Tuple key = t;
+ Object value = null;
+ // (key, value)
+ Tuple2<Tuple, Object> out = new Tuple2<Tuple, Object>(key, value);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Sort ToKeyValueFunction out " + out);
+ }
+ return out;
+ }
+ }
+
+ private static class PigSecondaryKeyComparatorSpark implements Comparator, Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private static boolean[] secondarySortOrder;
+
+ public PigSecondaryKeyComparatorSpark(boolean[] pSecondarySortOrder) {
+ secondarySortOrder = pSecondarySortOrder;
+ }
+
+ @Override
+ public int compare(Object o1, Object o2) {
+ Tuple t1 = (Tuple) o1;
+ Tuple t2 = (Tuple) o2;
+ try {
+ if ((t1.size() < 3) || (t2.size() < 3)) {
+ throw new RuntimeException("tuple size must bigger than 3, tuple[0] stands for index, tuple[1]" +
+ "stands for the compound key, tuple[3] stands for the value");
+ }
+ Tuple compoundKey1 = (Tuple) t1.get(1);
+ Tuple compoundKey2 = (Tuple) t2.get(1);
+ if ((compoundKey1.size() < 2) || (compoundKey2.size() < 2)) {
+ throw new RuntimeException("compoundKey size must bigger than, compoundKey[0] stands for firstKey," +
+ "compoundKey[1] stands for secondaryKey");
+ }
+ Object secondaryKey1 = compoundKey1.get(1);
+ Object secondaryKey2 = compoundKey2.get(1);
+ int res = compareSecondaryKeys(secondaryKey1, secondaryKey2, secondarySortOrder);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("t1:" + t1 + "t2:" + t2 + " res:" + res);
+ }
+ return res;
+ } catch (ExecException e) {
+ throw new RuntimeException("Fail to get the compoundKey", e);
+ }
+ }
+
+ private int compareSecondaryKeys(Object o1, Object o2, boolean[] asc) {
+ int rc = 0;
+ if (o1 != null && o2 != null && o1 instanceof Tuple && o2 instanceof Tuple) {
+ // objects are Tuples, we may need to apply sort order inside them
+ Tuple t1 = (Tuple) o1;
+ Tuple t2 = (Tuple) o2;
+ int sz1 = t1.size();
+ int sz2 = t2.size();
+ if (sz2 < sz1) {
+ return 1;
+ } else if (sz2 > sz1) {
+ return -1;
+ } else {
+ for (int i = 0; i < sz1; i++) {
+ try {
+ rc = DataType.compare(t1.get(i), t2.get(i));
+ if (rc != 0 && asc != null && asc.length > 1 && !asc[i])
+ rc *= -1;
+ if ((t1.get(i) == null) || (t2.get(i) == null)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("t1.get(i) is:" + t1.get(i) + " t2.get(i) is:" + t2.get(i));
+ }
+ }
+ if (rc != 0) break;
+ } catch (ExecException e) {
+ throw new RuntimeException("Unable to compare tuples", e);
+ }
+ }
+ }
+ } else {
+ // objects are NOT Tuples, delegate to DataType.compare()
+ rc = DataType.compare(o1, o2);
+ }
+ // apply sort order for keys that are not tuples or for whole tuples
+ if (asc != null && asc.length == 1 && !asc[0])
+ rc *= -1;
+ return rc;
}
}
private static class GetKeyFunction implements Function<Tuple, Object>, Serializable {
+ public final POGlobalRearrangeSpark glrSpark;
+
+ public GetKeyFunction(POGlobalRearrangeSpark globalRearrangeSpark) {
+ this.glrSpark = globalRearrangeSpark;
+ }
public Object call(Tuple t) {
try {
- LOG.debug("GetKeyFunction in " + t);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("GetKeyFunction in " + t);
+ }
// see PigGenericMapReduce For the key
- Object key = t.get(1);
- LOG.debug("GetKeyFunction out " + key);
+ Object key = null;
+ if ((glrSpark != null) && (glrSpark.isUseSecondaryKey())) {
+ key = ((Tuple) t.get(1)).get(0);
+ } else {
+ key = t.get(1);
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("GetKeyFunction out " + key);
+ }
return key;
} catch (ExecException e) {
throw new RuntimeException(e);
@@ -116,14 +265,23 @@ public class GlobalRearrangeConverter im
private static class GroupTupleFunction implements Function<Tuple2<Object, Iterable<Tuple>>, Tuple>,
Serializable {
+ public final POGlobalRearrangeSpark glrSpark;
+
+ public GroupTupleFunction(POGlobalRearrangeSpark globalRearrangeSpark) {
+ this.glrSpark = globalRearrangeSpark;
+ }
public Tuple call(Tuple2<Object, Iterable<Tuple>> v1) {
try {
- LOG.debug("GroupTupleFunction in " + v1);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("GroupTupleFunction in " + v1);
+ }
Tuple tuple = tf.newTuple(2);
tuple.set(0, v1._1()); // the (index, key) tuple
tuple.set(1, v1._2().iterator()); // the Seq<Tuple> aka bag of values
- LOG.debug("GroupTupleFunction out " + tuple);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("GroupTupleFunction out " + tuple);
+ }
return tuple;
} catch (ExecException e) {
throw new RuntimeException(e);
@@ -138,13 +296,17 @@ public class GlobalRearrangeConverter im
public Tuple2<Object, Tuple> call(Tuple t) {
try {
// (index, key, value)
- LOG.debug("ToKeyValueFunction in " + t);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("ToKeyValueFunction in " + t);
+ }
Object key = t.get(1);
Tuple value = (Tuple) t.get(2); // value
// (key, value)
Tuple2<Object, Tuple> out = new Tuple2<Object, Tuple>(key,
value);
- LOG.debug("ToKeyValueFunction out " + out);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("ToKeyValueFunction out " + out);
+ }
return out;
} catch (ExecException e) {
throw new RuntimeException(e);
@@ -158,7 +320,9 @@ public class GlobalRearrangeConverter im
@Override
public Tuple call(Tuple2<Object, Seq<Seq<Tuple>>> input) {
try {
- LOG.debug("ToGroupKeyValueFunction2 in " + input);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("ToGroupKeyValueFunction2 in " + input);
+ }
final Object key = input._1();
Object obj = input._2();
// XXX this is a hack for Spark 1.1.0: the type is an Array, not Seq
@@ -190,7 +354,9 @@ public class GlobalRearrangeConverter im
Tuple out = tf.newTuple(2);
out.set(0, key);
out.set(1, new IteratorUnion<Tuple>(tupleIterators.iterator()));
- LOG.debug("ToGroupKeyValueFunction2 out " + out);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("ToGroupKeyValueFunction2 out " + out);
+ }
return out;
} catch (Exception e) {
throw new RuntimeException(e);
Added: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POGlobalRearrangeSpark.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POGlobalRearrangeSpark.java?rev=1679557&view=auto
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POGlobalRearrangeSpark.java (added)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POGlobalRearrangeSpark.java Fri May 15 13:00:02 2015
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.operator;
+
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POGlobalRearrange;
+
+/**
+ * POGlobalRearrange for spark mode
+ */
+public class POGlobalRearrangeSpark extends POGlobalRearrange {
+ // Use secondary key
+ private boolean useSecondaryKey;
+ // Sort order for secondary keys;
+ private boolean[] secondarySortOrder;
+
+ public POGlobalRearrangeSpark(POGlobalRearrange copy) {
+ super(copy.getOperatorKey());
+ }
+
+ public boolean isUseSecondaryKey() {
+ return useSecondaryKey;
+ }
+
+ public void setUseSecondaryKey(boolean useSecondaryKey) {
+ this.useSecondaryKey = useSecondaryKey;
+ }
+
+ public boolean[] getSecondarySortOrder() {
+ return secondarySortOrder;
+ }
+
+ public void setSecondarySortOrder(boolean[] secondarySortOrder) {
+ this.secondarySortOrder = secondarySortOrder;
+ }
+}
Added: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/SecondaryKeyOptimizerSpark.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/SecondaryKeyOptimizerSpark.java?rev=1679557&view=auto
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/SecondaryKeyOptimizerSpark.java (added)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/SecondaryKeyOptimizerSpark.java Fri May 15 13:00:02 2015
@@ -0,0 +1,218 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.pig.backend.hadoop.executionengine.spark.optimizer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.List;
+
+import org.apache.pig.backend.hadoop.executionengine.optimizer.SecondaryKeyOptimizer;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
+import org.apache.pig.backend.hadoop.executionengine.spark.operator.POGlobalRearrangeSpark;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOpPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperPlan;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
+import org.apache.pig.backend.hadoop.executionengine.util.SecondaryKeyOptimizerUtil;
+import org.apache.pig.impl.plan.DepthFirstWalker;
+import org.apache.pig.impl.plan.PlanException;
+import org.apache.pig.impl.plan.VisitorException;
+
+/**
+ * Secondary key sort optimization for spark mode
+ */
+public class SecondaryKeyOptimizerSpark extends SparkOpPlanVisitor implements SecondaryKeyOptimizer {
+ private static final Log LOG = LogFactory
+ .getLog(SecondaryKeyOptimizerSpark.class);
+
+ private int numSortRemoved = 0;
+ private int numDistinctChanged = 0;
+ private int numUseSecondaryKey = 0;
+
+ public SecondaryKeyOptimizerSpark(SparkOperPlan plan) {
+ super(plan, new DepthFirstWalker<SparkOperator, SparkOperPlan>(plan));
+ }
+
+ /**
+ * Secondary key sort optimization is enabled in group + foreach nested situation, like TestAccumlator#testAccumWithSort
+ * After calling SecondaryKeyOptimizerUtil.applySecondaryKeySort, the POSort in the POForeach will be deleted in the spark plan.
+ * Sort function can be implemented in secondary key sort even though POSort is deleted in the spark plan.
+ *
+ * @param sparkOperator
+ * @throws VisitorException
+ */
+ @Override
+ public void visitSparkOp(SparkOperator sparkOperator) throws VisitorException {
+ List<POLocalRearrange> rearranges = PlanHelper.getPhysicalOperators(sparkOperator.physicalPlan, POLocalRearrange.class);
+ if (rearranges.isEmpty()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("No POLocalRearranges found in the sparkOperator.Secondary key optimization is no need");
+ }
+ return;
+ }
+
+ /**
+ * When every POLocalRearrange is encounted in the sparkOperator.physicalPlan,
+ * the sub-physicalplan between the previousLR(or root) to currentLR is considered as mapPlan(like what
+ * we call in mapreduce) and the sub-physicalplan between the POGlobalRearrange(the successor of currentLR) and
+ * nextLR(or leaf) is considered as reducePlan(like what we call in mapreduce). After mapPlan and reducePlan are got,
+ * use SecondaryKeyOptimizerUtil.applySecondaryKeySort(mapPlan,reducePlan) to enable secondary key optimization.
+ * SecondaryKeyOptimizerUtil.applySecondaryKeySort will remove POSort in the foreach in the reducePlan or
+ * change PODistinct to POSortedDistinct in the foreach in the reducePlan.
+ */
+ for (POLocalRearrange currentLR : rearranges) {
+ PhysicalPlan mapPlan = null;
+ PhysicalPlan reducePlan = null;
+ try {
+ mapPlan = getMapPlan(sparkOperator.physicalPlan, currentLR);
+ } catch (PlanException e) {
+ throw new RuntimeException(e);
+ }
+ try {
+ reducePlan = getReducePlan(sparkOperator.physicalPlan, currentLR);
+ } catch (PlanException e) {
+ throw new RuntimeException(e);
+ }
+
+ // Current code does not enable secondarykey optimization when join case is encounted
+ List<PhysicalOperator> rootsOfReducePlan = reducePlan.getRoots();
+ if (rootsOfReducePlan.get(0) instanceof POGlobalRearrangeSpark) {
+ PhysicalOperator glr = rootsOfReducePlan.get(0);
+ List<PhysicalOperator> predecessors = sparkOperator.physicalPlan.getPredecessors(glr);
+ if (predecessors != null && predecessors.size() >= 2) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Current code does not enable secondarykey optimization when join case is encounted");
+ }
+ return;
+ }
+ }
+
+ if (mapPlan.getOperator(currentLR.getOperatorKey()) == null) {
+ // The POLocalRearrange is sub-plan of a POSplit
+ mapPlan = PlanHelper.getLocalRearrangePlanFromSplit(mapPlan, currentLR.getOperatorKey());
+ }
+
+ SecondaryKeyOptimizerUtil.setIsSparkMode(true);
+ SecondaryKeyOptimizerUtil.SecondaryKeyOptimizerInfo info = SecondaryKeyOptimizerUtil.applySecondaryKeySort(mapPlan, reducePlan);
+ if (info != null) {
+ numSortRemoved += info.getNumSortRemoved();
+ numDistinctChanged += info.getNumDistinctChanged();
+ numUseSecondaryKey += info.getNumUseSecondaryKey();
+ }
+ }
+ }
+
+ /**
+ * Find the MRPlan of the physicalPlan which containing currentLR
+ * Backward search all the physicalOperators which precede currentLR until the previous POLocalRearrange
+ * is found or the root of physicalPlan is found.
+ *
+ * @param physicalPlan
+ * @param currentLR
+ * @return
+ * @throws VisitorException
+ * @throws PlanException
+ */
+ private PhysicalPlan getMapPlan(PhysicalPlan physicalPlan, POLocalRearrange currentLR) throws VisitorException, PlanException {
+ PhysicalPlan mapPlan = new PhysicalPlan();
+ mapPlan.addAsRoot(currentLR);
+ List<PhysicalOperator> preList = physicalPlan.getPredecessors(currentLR);
+ while (true) {
+ if (preList == null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("there is nothing to backward search");
+ }
+ break;
+ }
+ if (preList.size() != 1) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("the size of predecessor of currentLR should be 1 but now it is not 1,physicalPlan:" + physicalPlan);
+ }
+ break;
+ }
+ PhysicalOperator pre = preList.get(0);
+ if (pre instanceof POLocalRearrange) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Finishing to find the mapPlan between preLR and currentLR.");
+ }
+ break;
+ }
+ mapPlan.addAsRoot(pre);
+ preList = physicalPlan.getPredecessors(pre);
+
+ }
+ return mapPlan;
+ }
+
+ /**
+ * Find the ReducePlan of the physicalPlan which containing currentLR
+ * Forward search all the physicalOperators which succeed currentLR until the next POLocalRearrange
+ * is found or the leave of physicalPlan is found.
+ *
+ * @param physicalPlan
+ * @param currentLR
+ * @return
+ * @throws PlanException
+ */
+ private PhysicalPlan getReducePlan(PhysicalPlan physicalPlan, POLocalRearrange currentLR) throws PlanException {
+ PhysicalPlan reducePlan = new PhysicalPlan();
+ List<PhysicalOperator> succList = physicalPlan.getSuccessors(currentLR);
+ while (true) {
+ if (succList == null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("there is nothing to forward search");
+ }
+ break;
+ }
+ if (succList.size() != 1) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("the size of successors of currentLR should be 1 but now it is not 1,physicalPlan:" + physicalPlan);
+ }
+ break;
+ }
+ PhysicalOperator succ = succList.get(0);
+ if (succ instanceof POLocalRearrange) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Finishing to find the ReducePlan between currentLR and netxtLR.");
+ }
+ break;
+ }
+ reducePlan.addAsLeaf(succ);
+ succList = physicalPlan.getSuccessors(succ);
+ }
+ return reducePlan;
+ }
+
+ @Override
+ public int getNumSortRemoved() {
+ return numSortRemoved;
+ }
+
+ @Override
+ public int getNumDistinctChanged() {
+ return numDistinctChanged;
+ }
+
+ @Override
+ public int getNumUseSecondaryKey() {
+ return numUseSecondaryKey;
+ }
+}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java?rev=1679557&r1=1679556&r2=1679557&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java Fri May 15 13:00:02 2015
@@ -57,6 +57,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.Packager;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
import org.apache.pig.backend.hadoop.executionengine.spark.operator.NativeSparkOperator;
+import org.apache.pig.backend.hadoop.executionengine.spark.operator.POGlobalRearrangeSpark;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.plan.DepthFirstWalker;
@@ -551,9 +552,10 @@ public class SparkCompiler extends PhyPl
public void visitGlobalRearrange(POGlobalRearrange op)
throws VisitorException {
try {
- addToPlan(op);
- curSparkOp.customPartitioner = op.getCustomPartitioner();
- phyToSparkOpMap.put(op, curSparkOp);
+ POGlobalRearrangeSpark glbOp = new POGlobalRearrangeSpark(op);
+ addToPlan(glbOp);
+ curSparkOp.customPartitioner = op.getCustomPartitioner();
+ phyToSparkOpMap.put(op, curSparkOp);
} catch (Exception e) {
int errCode = 2034;
String msg = "Error compiling operator "
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperator.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperator.java?rev=1679557&r1=1679556&r2=1679557&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperator.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperator.java Fri May 15 13:00:02 2015
@@ -17,6 +17,7 @@
*/
package org.apache.pig.backend.hadoop.executionengine.spark.plan;
+import java.util.BitSet;
import java.util.HashSet;
import java.util.Set;
@@ -58,7 +59,7 @@ public class SparkOperator extends Opera
public int requestedParallelism = -1;
- private OPER_FEATURE feature = OPER_FEATURE.NONE;
+ private BitSet feature = new BitSet();
private boolean splitter = false;
@@ -118,28 +119,28 @@ public class SparkOperator extends Opera
}
public boolean isGroupBy() {
- return (feature == OPER_FEATURE.GROUPBY);
- }
+ return feature.get(OPER_FEATURE.GROUPBY.ordinal());
+ }
public void markGroupBy() {
- feature = OPER_FEATURE.GROUPBY;
- }
+ feature.set(OPER_FEATURE.GROUPBY.ordinal());
+ }
public boolean isCogroup() {
- return (feature == OPER_FEATURE.COGROUP);
- }
+ return feature.get(OPER_FEATURE.COGROUP.ordinal());
+ }
public void markCogroup() {
- feature = OPER_FEATURE.COGROUP;
- }
+ feature.set(OPER_FEATURE.COGROUP.ordinal());
+ }
public boolean isRegularJoin() {
- return (feature == OPER_FEATURE.HASHJOIN);
- }
+ return feature.get(OPER_FEATURE.HASHJOIN.ordinal());
+ }
public void markRegularJoin() {
- feature = OPER_FEATURE.HASHJOIN;
- }
+ feature.set(OPER_FEATURE.HASHJOIN.ordinal());
+ }
public int getRequestedParallelism() {
return requestedParallelism;
@@ -154,12 +155,12 @@ public class SparkOperator extends Opera
}
public boolean isSampler() {
- return (feature == OPER_FEATURE.SAMPLER);
- }
+ return feature.get(OPER_FEATURE.SAMPLER.ordinal());
+ }
public void markSampler() {
- feature = OPER_FEATURE.SAMPLER;
- }
+ feature.set(OPER_FEATURE.SAMPLER.ordinal());
+ }
public void setSkewedJoinPartitionFile(String file) {
skewedJoinPartitionFile = file;
@@ -186,10 +187,10 @@ public class SparkOperator extends Opera
}
public boolean isIndexer() {
- return (feature == OPER_FEATURE.INDEXER);
- }
+ return feature.get(OPER_FEATURE.INDEXER.ordinal());
+ }
- public void markIndexer() {
- feature = OPER_FEATURE.INDEXER;
- }
+ public void markIndexer() {
+ feature.set(OPER_FEATURE.INDEXER.ordinal());
+ }
}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPrinter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPrinter.java?rev=1679557&r1=1679556&r2=1679557&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPrinter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPrinter.java Fri May 15 13:00:02 2015
@@ -18,11 +18,14 @@
package org.apache.pig.backend.hadoop.executionengine.spark.plan;
import java.io.PrintStream;
+import java.util.List;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PlanPrinter;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
import org.apache.pig.backend.hadoop.executionengine.spark.operator.NativeSparkOperator;
+import org.apache.pig.backend.hadoop.executionengine.spark.operator.POGlobalRearrangeSpark;
import org.apache.pig.impl.plan.DepthFirstWalker;
import org.apache.pig.impl.plan.VisitorException;
@@ -46,21 +49,27 @@ public class SparkPrinter extends SparkO
isVerbose = verbose;
}
- @Override
- public void visitSparkOp(SparkOperator sparkOp) throws VisitorException {
- mStream.println("");
- mStream.println("Spark node " + sparkOp.getOperatorKey().toString());
- if (sparkOp instanceof NativeSparkOperator) {
- mStream.println("--------");
- mStream.println();
- return;
- }
- if (sparkOp.physicalPlan != null && sparkOp.physicalPlan.size() > 0) {
- PlanPrinter<PhysicalOperator, PhysicalPlan> printer = new PlanPrinter<PhysicalOperator, PhysicalPlan>(
- sparkOp.physicalPlan, mStream);
- printer.setVerbose(isVerbose);
- printer.visit();
- mStream.println("--------");
- }
- }
+ @Override
+ public void visitSparkOp(SparkOperator sparkOp) throws VisitorException {
+ mStream.println("");
+ mStream.println("Spark node " + sparkOp.getOperatorKey().toString());
+ if (sparkOp instanceof NativeSparkOperator) {
+ mStream.println("--------");
+ mStream.println();
+ return;
+ }
+ if (sparkOp.physicalPlan != null && sparkOp.physicalPlan.size() > 0) {
+ PlanPrinter<PhysicalOperator, PhysicalPlan> printer = new PlanPrinter<PhysicalOperator, PhysicalPlan>(
+ sparkOp.physicalPlan, mStream);
+ printer.setVerbose(isVerbose);
+ printer.visit();
+ mStream.println("--------");
+ }
+ List<POGlobalRearrangeSpark> glrList = PlanHelper.getPhysicalOperators(sparkOp.physicalPlan, POGlobalRearrangeSpark.class);
+ for (POGlobalRearrangeSpark glr : glrList) {
+ if (glr.isUseSecondaryKey()) {
+ mStream.println("POGlobalRearrange(" + glr.getOperatorKey() + ") uses secondaryKey");
+ }
+ }
+ }
}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/AccumulatorOptimizerUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/AccumulatorOptimizerUtil.java?rev=1679557&r1=1679556&r2=1679557&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/AccumulatorOptimizerUtil.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/AccumulatorOptimizerUtil.java Fri May 15 13:00:02 2015
@@ -38,6 +38,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POGlobalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.spark.operator.POGlobalRearrangeSpark;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSortedDistinct;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.Packager;
@@ -297,8 +298,8 @@ public class AccumulatorOptimizerUtil {
return;
}
- List<POGlobalRearrange> gras = PlanHelper.getPhysicalOperators(plan,
- POGlobalRearrange.class);
+ List<POGlobalRearrangeSpark> gras = PlanHelper.getPhysicalOperators(plan,
+ POGlobalRearrangeSpark.class);
for (POGlobalRearrange gra : gras) {
addAccumulatorSparkForGRASubDAG(plan, gra);
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/SecondaryKeyOptimizerUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/SecondaryKeyOptimizerUtil.java?rev=1679557&r1=1679556&r2=1679557&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/SecondaryKeyOptimizerUtil.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/SecondaryKeyOptimizerUtil.java Fri May 15 13:00:02 2015
@@ -35,12 +35,14 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POGlobalRearrange;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSortedDistinct;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
+import org.apache.pig.backend.hadoop.executionengine.spark.operator.POGlobalRearrangeSpark;
import org.apache.pig.classification.InterfaceAudience;
import org.apache.pig.data.DataType;
import org.apache.pig.impl.io.PigNullableWritable;
@@ -53,6 +55,7 @@ import org.apache.pig.impl.plan.VisitorE
@InterfaceAudience.Private
public class SecondaryKeyOptimizerUtil {
private static Log log = LogFactory.getLog(SecondaryKeyOptimizerUtil.class.getName());
+ private static boolean isSparkMode;
private SecondaryKeyOptimizerUtil() {
@@ -241,14 +244,33 @@ public class SecondaryKeyOptimizerUtil {
}
PhysicalOperator root = reduceRoots.get(0);
- if (!(root instanceof POPackage)) {
- log.debug("Expected reduce root to be a POPackage, skip secondary key optimizing");
- return null;
+ PhysicalOperator currentNode = null;
+ if (!isSparkMode) {
+ if (!(root instanceof POPackage)) {
+ log.debug("Expected reduce root to be a POPackage, skip secondary key optimizing");
+ return null;
+ } else {
+ currentNode = root;
+ }
+ } else {
+ if (!(root instanceof POGlobalRearrange)) {
+ log.debug("Expected reduce root to be a POGlobalRearrange, skip secondary key optimizing");
+ return null;
+ } else {
+ List<PhysicalOperator> globalRearrangeSuccs = reducePlan
+ .getSuccessors(root);
+ if (globalRearrangeSuccs.size() == 1) {
+ currentNode = globalRearrangeSuccs.get(0);
+ } else {
+ log.debug("Expected successor of a POGlobalRearrange is POPackage, skip secondary key optimizing");
+ return null;
+ }
+ }
}
// visit the POForEach of the reduce plan. We can have Limit and Filter
// in the middle
- PhysicalOperator currentNode = root;
+
POForEach foreach = null;
while (currentNode != null) {
if (currentNode instanceof POPackage
@@ -402,8 +424,15 @@ public class SecondaryKeyOptimizerUtil {
throw new VisitorException("Cannot find POLocalRearrange to set secondary plan", errorCode);
}
}
- POPackage pack = (POPackage) root;
- pack.getPkgr().setUseSecondaryKey(true);
+
+ if (root instanceof POGlobalRearrangeSpark) {
+ POGlobalRearrangeSpark plg = (POGlobalRearrangeSpark) root;
+ plg.setUseSecondaryKey(true);
+ plg.setSecondarySortOrder(secondarySortKeyInfo.getAscs());
+ } else if (root instanceof POPackage) {
+ POPackage pack = (POPackage) root;
+ pack.getPkgr().setUseSecondaryKey(true);
+ }
}
return secKeyOptimizerInfo;
}
@@ -658,4 +687,7 @@ public class SecondaryKeyOptimizerUtil {
return false;
}
+ public static void setIsSparkMode(boolean isSparkMode) {
+ SecondaryKeyOptimizerUtil.isSparkMode = isSparkMode;
+ }
}
Modified: pig/branches/spark/src/org/apache/pig/impl/plan/OperatorPlan.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/plan/OperatorPlan.java?rev=1679557&r1=1679556&r2=1679557&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/plan/OperatorPlan.java (original)
+++ pig/branches/spark/src/org/apache/pig/impl/plan/OperatorPlan.java Fri May 15 13:00:02 2015
@@ -534,6 +534,24 @@ public abstract class OperatorPlan<E ext
connect(oper, leaf);
}
}
+
+ /**
+ * Adds the root operator to the plan and connects
+ * all existing roots the new root
+ *
+ * @param root
+ * @throws PlanException
+ */
+ public void addAsRoot(E root) throws PlanException {
+ List<E> oldRoots = new ArrayList<E>();
+ for (E operator : getRoots()) {
+ oldRoots.add(operator);
+ }
+ add(root);
+ for (E oper : oldRoots) {
+ connect(root, oper);
+ }
+ }
public boolean isSingleLeafPlan() {
List<E> tmpList = getLeaves() ;
Added: pig/branches/spark/test/org/apache/pig/spark/TestSecondarySortSpark.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/spark/TestSecondarySortSpark.java?rev=1679557&view=auto
==============================================================================
--- pig/branches/spark/test/org/apache/pig/spark/TestSecondarySortSpark.java (added)
+++ pig/branches/spark/test/org/apache/pig/spark/TestSecondarySortSpark.java Fri May 15 13:00:02 2015
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.spark;
+
+import org.apache.pig.backend.hadoop.executionengine.optimizer.SecondaryKeyOptimizer;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.spark.optimizer.SecondaryKeyOptimizerSpark;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkCompiler;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperPlan;
+import org.apache.pig.test.MiniGenericCluster;
+import org.apache.pig.test.TestSecondarySort;
+import org.apache.pig.test.Util;
+
+/**
+ * TestSecondarySortSpark.
+ */
+public class TestSecondarySortSpark extends TestSecondarySort {
+
+ public TestSecondarySortSpark() {
+ super();
+ }
+
+ @Override
+ public MiniGenericCluster getCluster() {
+ return MiniGenericCluster.buildCluster(MiniGenericCluster.EXECTYPE_SPARK);
+ }
+
+ @Override
+ public SecondaryKeyOptimizer visitSecondaryKeyOptimizer(String query) throws Exception {
+ PhysicalPlan pp = Util.buildPp(pigServer, query);
+ SparkCompiler comp = new SparkCompiler(pp, pc);
+ comp.compile();
+ SparkOperPlan sparkPlan = comp.getSparkPlan();
+ SecondaryKeyOptimizerSpark optimizer = new SecondaryKeyOptimizerSpark(sparkPlan);
+ optimizer.visit();
+ return optimizer;
+ }
+}
Modified: pig/branches/spark/test/org/apache/pig/test/TestSecondarySort.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestSecondarySort.java?rev=1679557&r1=1679556&r2=1679557&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestSecondarySort.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestSecondarySort.java Fri May 15 13:00:02 2015
@@ -483,6 +483,9 @@ public abstract class TestSecondarySort
@Test
// Once custom partitioner is used, we cannot use secondary key optimizer, see PIG-3827
public void testCustomPartitionerWithSort() throws Exception {
+ if( Util.isSparkExecType(cluster.getExecType())){
+ return;
+ }
File tmpFile1 = Util.createTempFileDelOnExit("test", "txt");
PrintStream ps1 = new PrintStream(new FileOutputStream(tmpFile1));
ps1.println("1\t2\t3");
Modified: pig/branches/spark/test/spark-tests
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/spark-tests?rev=1679557&r1=1679556&r2=1679557&view=diff
==============================================================================
--- pig/branches/spark/test/spark-tests (original)
+++ pig/branches/spark/test/spark-tests Fri May 15 13:00:02 2015
@@ -63,3 +63,4 @@
**/TestMergeJoin.java
**/TestNativeMapReduce.java
**/TestPigProgressReporting.java
+**/TestSecondarySortSpark.java