You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by xu...@apache.org on 2016/07/08 02:37:37 UTC
svn commit: r1751848 - in
/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark:
./ converter/ operator/ optimizer/
Author: xuefu
Date: Fri Jul 8 02:37:36 2016
New Revision: 1751848
URL: http://svn.apache.org/viewvc?rev=1751848&view=rev
Log:
PIG-4797: Optimization for join/group case for spark mode (Liyun via Xuefu)
Added:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/JoinGroupSparkConverter.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POJoinGroupSpark.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/JoinGroupOptimizerSpark.java
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/IndexedKey.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PigSecondaryKeyComparatorSpark.java
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java?rev=1751848&r1=1751847&r2=1751848&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java Fri Jul 8 02:37:36 2016
@@ -23,6 +23,8 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -48,6 +50,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
import org.apache.pig.backend.hadoop.executionengine.spark.converter.RDDConverter;
import org.apache.pig.backend.hadoop.executionengine.spark.operator.NativeSparkOperator;
+import org.apache.pig.backend.hadoop.executionengine.spark.operator.POJoinGroupSpark;
import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOpPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperPlan;
import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
@@ -238,13 +241,8 @@ public class JobGraphBuilder extends Spa
Set<OperatorKey> predsFromPreviousSparkOper)
throws IOException {
RDD<Tuple> nextRDD = null;
- List<PhysicalOperator> predecessorsOfCurrentPhysicalOp = plan
- .getPredecessors(physicalOperator);
- if (predecessorsOfCurrentPhysicalOp != null && predecessorsOfCurrentPhysicalOp.size() > 1) {
- Collections.sort(predecessorsOfCurrentPhysicalOp);
- }
-
- Set<OperatorKey> operatorKeysOfAllPreds = new HashSet<OperatorKey>();
+ List<PhysicalOperator> predecessorsOfCurrentPhysicalOp = getPredecessors(plan, physicalOperator);
+ Set<OperatorKey> operatorKeysOfAllPreds = new LinkedHashSet<OperatorKey>();
addPredsFromPrevoiousSparkOp(sparkOperator, physicalOperator, operatorKeysOfAllPreds);
if (predecessorsOfCurrentPhysicalOp != null) {
for (PhysicalOperator predecessor : predecessorsOfCurrentPhysicalOp) {
@@ -296,12 +294,29 @@ public class JobGraphBuilder extends Spa
}
}
+ private List<PhysicalOperator> getPredecessors(PhysicalPlan plan, PhysicalOperator op) {
+ List preds = null;
+ if (!(op instanceof POJoinGroupSpark)) {
+ preds = plan.getPredecessors(op);
+ if (preds != null && preds.size() > 1) {
+ Collections.sort(preds);
+ }
+ } else {
+ //For POJoinGroupSpark, we could not use plan.getPredecessors(op)+ sort to get
+ //the predecessors with correct order, more detail see JoinOptimizerSpark#restructSparkOp
+ preds = ((POJoinGroupSpark) op).getPredecessors();
+ }
+ return preds;
+ }
+
//get all rdds of predecessors sorted by the OperatorKey
private List<RDD<Tuple>> sortPredecessorRDDs(Set<OperatorKey> operatorKeysOfAllPreds) {
List<RDD<Tuple>> predecessorRDDs = Lists.newArrayList();
- List<OperatorKey> operatorKeyOfAllPreds = Lists.newArrayList(operatorKeysOfAllPreds);
- Collections.sort(operatorKeyOfAllPreds);
- for (OperatorKey operatorKeyOfAllPred : operatorKeyOfAllPreds) {
+// List<OperatorKey> operatorKeyOfAllPreds = Lists.newArrayList(operatorKeysOfAllPreds);
+// Collections.sort(operatorKeyOfAllPreds);
+ //We need not sort operatorKeyOfAllPreds any more because operatorKeyOfAllPreds is LinkedHashSet
+ //which provides the order of insertion, before we insert element which is sorted by OperatorKey
+ for (OperatorKey operatorKeyOfAllPred : operatorKeysOfAllPreds) {
predecessorRDDs.add(physicalOpRdds.get(operatorKeyOfAllPred));
}
return predecessorRDDs;
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1751848&r1=1751847&r2=1751848&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java Fri Jul 8 02:37:36 2016
@@ -78,6 +78,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.spark.converter.FilterConverter;
import org.apache.pig.backend.hadoop.executionengine.spark.converter.ForEachConverter;
import org.apache.pig.backend.hadoop.executionengine.spark.converter.GlobalRearrangeConverter;
+import org.apache.pig.backend.hadoop.executionengine.spark.converter.JoinGroupSparkConverter;
import org.apache.pig.backend.hadoop.executionengine.spark.converter.LimitConverter;
import org.apache.pig.backend.hadoop.executionengine.spark.converter.LoadConverter;
import org.apache.pig.backend.hadoop.executionengine.spark.converter.LocalRearrangeConverter;
@@ -94,9 +95,11 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.spark.converter.StreamConverter;
import org.apache.pig.backend.hadoop.executionengine.spark.converter.UnionConverter;
import org.apache.pig.backend.hadoop.executionengine.spark.operator.POGlobalRearrangeSpark;
+import org.apache.pig.backend.hadoop.executionengine.spark.operator.POJoinGroupSpark;
import org.apache.pig.backend.hadoop.executionengine.spark.operator.POReduceBySpark;
import org.apache.pig.backend.hadoop.executionengine.spark.optimizer.AccumulatorOptimizer;
import org.apache.pig.backend.hadoop.executionengine.spark.optimizer.CombinerOptimizer;
+import org.apache.pig.backend.hadoop.executionengine.spark.optimizer.JoinGroupOptimizerSpark;
import org.apache.pig.backend.hadoop.executionengine.spark.optimizer.MultiQueryOptimizerSpark;
import org.apache.pig.backend.hadoop.executionengine.spark.optimizer.NoopFilterRemover;
import org.apache.pig.backend.hadoop.executionengine.spark.optimizer.ParallelismSetter;
@@ -114,6 +117,7 @@ import org.apache.pig.impl.plan.Operator
import org.apache.pig.impl.plan.PlanException;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.util.JarManager;
+import org.apache.pig.impl.util.UDFContext;
import org.apache.pig.impl.util.Utils;
import org.apache.pig.tools.pigstats.OutputStats;
import org.apache.pig.tools.pigstats.PigStats;
@@ -199,6 +203,7 @@ public class SparkLauncher extends Launc
convertMap.put(POPackage.class, new PackageConverter(confBytes));
convertMap.put(POLocalRearrange.class, new LocalRearrangeConverter());
convertMap.put(POGlobalRearrangeSpark.class, new GlobalRearrangeConverter());
+ convertMap.put(POJoinGroupSpark.class, new JoinGroupSparkConverter(confBytes));
convertMap.put(POLimit.class, new LimitConverter());
convertMap.put(PODistinct.class, new DistinctConverter());
convertMap.put(POUnion.class, new UnionConverter(sparkContext.sc()));
@@ -273,6 +278,12 @@ public class SparkLauncher extends Launc
mqOptimizer.visit();
}
+ //since JoinGroupOptimizerSpark modifies the plan and collapses LRA+GLA+PKG into POJoinGroupSpark while
+ //CombinerOptimizer collapses GLA+PKG into ReduceBy, so if JoinGroupOptimizerSpark first, the spark plan will be
+ //changed and not suitable for CombinerOptimizer.More detail see PIG-4797
+ JoinGroupOptimizerSpark joinOptimizer = new JoinGroupOptimizerSpark(plan);
+ joinOptimizer.visit();
+
if (LOG.isDebugEnabled()) {
System.out.println("after multiquery optimization:");
explain(plan, System.out, "text", true);
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/IndexedKey.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/IndexedKey.java?rev=1751848&r1=1751847&r2=1751848&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/IndexedKey.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/IndexedKey.java Fri Jul 8 02:37:36 2016
@@ -19,7 +19,10 @@ package org.apache.pig.backend.hadoop.ex
import java.io.Serializable;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
/**
@@ -28,9 +31,12 @@ import org.apache.pig.data.Tuple;
* either empty (or is a tuple with one or more empty fields). In this case,
* we must respect the SQL standard as documented in the equals() method.
*/
-public class IndexedKey implements Serializable {
+public class IndexedKey implements Serializable, Comparable {
+ private static final Log LOG = LogFactory.getLog(IndexedKey.class);
private byte index;
private Object key;
+ private boolean useSecondaryKey;
+ private boolean[] secondarySortOrder;
public IndexedKey(byte index, Object key) {
this.index = index;
@@ -140,4 +146,40 @@ public class IndexedKey implements Seria
}
return result;
}
+
+ //firstly compare the index
+ //secondly compare the key
+ @Override
+ public int compareTo(Object o) {
+ IndexedKey that = (IndexedKey) o;
+ int res = index - that.getIndex();
+ if (res > 0) {
+ return 1;
+ } else if (res < 0) {
+ return -1;
+ } else {
+ if (useSecondaryKey) {
+ Tuple thisCompoundKey = (Tuple) key;
+ Tuple thatCompoundKey = (Tuple) that.getKey();
+ try {
+ Object thisSecondary = thisCompoundKey.get(1);
+ Object thatSecondaryKey = thatCompoundKey.get(1);
+ return PigSecondaryKeyComparatorSpark.compareSecondaryKeys(thisSecondary, thatSecondaryKey, secondarySortOrder);
+
+ } catch (ExecException e) {
+ throw new RuntimeException("IndexedKey#compareTo throws exception ", e);
+ }
+ } else {
+ return DataType.compare(key, that.getKey());
+ }
+ }
+ }
+
+ public void setUseSecondaryKey(boolean useSecondaryKey) {
+ this.useSecondaryKey = useSecondaryKey;
+ }
+
+ public void setSecondarySortOrder(boolean[] secondarySortOrder) {
+ this.secondarySortOrder = secondarySortOrder;
+ }
}
\ No newline at end of file
Added: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/JoinGroupSparkConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/JoinGroupSparkConverter.java?rev=1751848&view=auto
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/JoinGroupSparkConverter.java (added)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/JoinGroupSparkConverter.java Fri Jul 8 02:37:36 2016
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.converter;
+
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import scala.Product2;
+import scala.Tuple2;
+import scala.collection.JavaConversions;
+import scala.collection.Seq;
+import scala.runtime.AbstractFunction1;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.backend.hadoop.executionengine.spark.KryoSerializer;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
+import org.apache.pig.backend.hadoop.executionengine.spark.operator.POGlobalRearrangeSpark;
+import org.apache.pig.backend.hadoop.executionengine.spark.operator.POJoinGroupSpark;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.NullableTuple;
+import org.apache.pig.impl.io.PigNullableWritable;
+import org.apache.spark.HashPartitioner;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.rdd.CoGroupedRDD;
+import org.apache.spark.rdd.RDD;
+
+
+public class JoinGroupSparkConverter implements RDDConverter<Tuple, Tuple, POJoinGroupSpark> {
+ private static final Log LOG = LogFactory
+ .getLog(JoinGroupSparkConverter.class);
+ private byte[] confBytes;
+
+ public JoinGroupSparkConverter(byte[] confBytes) {
+ this.confBytes = confBytes;
+ }
+
+ @Override
+ public RDD<Tuple> convert(List<RDD<Tuple>> predecessors, POJoinGroupSpark op) throws IOException {
+ SparkUtil.assertPredecessorSizeGreaterThan(predecessors,
+ op, 0);
+ List<POLocalRearrange> lraOps = op.getLraOps();
+ POGlobalRearrangeSpark glaOp = op.getGlaOp();
+ POPackage pkgOp = op.getPkgOp();
+ int parallelism = SparkUtil.getParallelism(predecessors, glaOp);
+ List<RDD<Tuple2<IndexedKey, Tuple>>> rddAfterLRA = new ArrayList<RDD<Tuple2<IndexedKey, Tuple>>>();
+ boolean useSecondaryKey = glaOp.isUseSecondaryKey();
+
+ for (int i = 0; i < predecessors.size(); i++) {
+ RDD<Tuple> rdd = predecessors.get(i);
+ rddAfterLRA.add(rdd.map(new LocalRearrangeFunction(lraOps.get(i), glaOp),
+ SparkUtil.<IndexedKey, Tuple>getTuple2Manifest()));
+ }
+ if (rddAfterLRA.size() == 1 && useSecondaryKey) {
+ rddAfterLRA.set(0, handleSecondarySort(rddAfterLRA.get(0), parallelism));
+ }
+
+ CoGroupedRDD<Object> coGroupedRDD = new CoGroupedRDD<Object>(
+ (Seq<RDD<? extends Product2<Object, ?>>>) (Object) (JavaConversions
+ .asScalaBuffer(rddAfterLRA).toSeq()),
+ SparkUtil.getPartitioner(glaOp.getCustomPartitioner(), parallelism),
+ SparkUtil.getManifest(Object.class));
+
+ RDD<Tuple2<IndexedKey, Seq<Seq<Tuple>>>> rdd =
+ (RDD<Tuple2<IndexedKey, Seq<Seq<Tuple>>>>) (Object) coGroupedRDD;
+ return rdd.toJavaRDD().map(new GroupPkgFunction(pkgOp, this.confBytes)).rdd();
+ }
+
+
+ private RDD<Tuple2<IndexedKey, Tuple>> handleSecondarySort(
+ RDD<Tuple2<IndexedKey, Tuple>> rdd, int parallelism) {
+
+ JavaPairRDD<IndexedKey, Tuple> pairRDD = JavaPairRDD.fromRDD(rdd, SparkUtil.getManifest(IndexedKey.class), SparkUtil.getManifest(Tuple.class));
+
+ //first sort the tuple by secondary key if enable useSecondaryKey sort
+ JavaPairRDD<IndexedKey, Tuple> sorted = pairRDD.repartitionAndSortWithinPartitions(
+ new HashPartitioner(parallelism));
+ JavaPairRDD<IndexedKey, Tuple> sortByKey = sorted.sortByKey();
+ return sortByKey.mapToPair(new RemoveSecondaryKey()).rdd();
+ }
+
+ private static class RemoveSecondaryKey implements
+ PairFunction<Tuple2<IndexedKey, Tuple>, IndexedKey, Tuple>,
+ Serializable {
+
+ @Override
+ public Tuple2<IndexedKey, Tuple> call(Tuple2<IndexedKey, Tuple> t) throws Exception {
+ IndexedKey key = t._1();
+ Tuple compoundKey = (Tuple) key.getKey();
+ if (compoundKey.size() < 2) {
+ throw new RuntimeException("compoundKey.size() should be more than 2");
+ }
+ IndexedKey newKey = new IndexedKey(key.getIndex(), compoundKey.get(0));
+ return new Tuple2<IndexedKey, Tuple>(newKey, t._2());
+ }
+ }
+
+
+ private static class LocalRearrangeFunction extends
+ AbstractFunction1<Tuple, Tuple2<IndexedKey, Tuple>> implements Serializable {
+
+ private final POLocalRearrange lra;
+
+ private boolean useSecondaryKey;
+ private boolean[] secondarySortOrder;
+
+ public LocalRearrangeFunction(POLocalRearrange lra, POGlobalRearrangeSpark glaOp) {
+ if( glaOp.isUseSecondaryKey()) {
+ this.useSecondaryKey = glaOp.isUseSecondaryKey();
+ this.secondarySortOrder = glaOp.getSecondarySortOrder();
+ }
+ this.lra = lra;
+ }
+
+ @Override
+ public Tuple2<IndexedKey, Tuple> apply(Tuple t) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("LocalRearrangeFunction in " + t);
+ }
+ Result result;
+ try {
+ lra.setInputs(null);
+ lra.attachInput(t);
+ result = lra.getNextTuple();
+
+ if (result == null) {
+ throw new RuntimeException(
+ "Null response found for LocalRearange on tuple: "
+ + t);
+ }
+
+ switch (result.returnStatus) {
+ case POStatus.STATUS_OK:
+ // (index, key, value without keys)
+ Tuple resultTuple = (Tuple) result.result;
+ Object key = resultTuple.get(1);
+ IndexedKey indexedKey = new IndexedKey((Byte) resultTuple.get(0), key);
+ if( useSecondaryKey) {
+ indexedKey.setUseSecondaryKey(useSecondaryKey);
+ indexedKey.setSecondarySortOrder(secondarySortOrder);
+ }
+ Tuple2<IndexedKey, Tuple> out = new Tuple2<IndexedKey, Tuple>(indexedKey,
+ (Tuple) resultTuple.get(2));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("LocalRearrangeFunction out " + out);
+ }
+ return out;
+ default:
+ throw new RuntimeException(
+ "Unexpected response code from operator "
+ + lra + " : " + result);
+ }
+ } catch (ExecException e) {
+ throw new RuntimeException(
+ "Couldn't do LocalRearange on tuple: " + t, e);
+ }
+ }
+
+ }
+
+ /**
+ * Send cogroup output where each element is {key, bag[]} to PoPackage
+ * then call PoPackage#getNextTuple to get the result
+ */
+ private static class GroupPkgFunction implements
+ Function<Tuple2<IndexedKey, Seq<Seq<Tuple>>>, Tuple>, Serializable {
+
+ private final POPackage pkgOp;
+ private byte[] confBytes;
+ private JobConf jobConf = null;
+
+ public GroupPkgFunction(POPackage pkgOp, byte[] confBytes) {
+ this.pkgOp = pkgOp;
+ this.confBytes = confBytes;
+ }
+
+ void initializeJobConf() {
+ jobConf = KryoSerializer.deserializeJobConf(this.confBytes);
+ jobConf.set("pig.cachedbag.type", "default");
+ PigMapReduce.sJobConfInternal.set(jobConf);
+ }
+
+ @Override
+ public Tuple call(final Tuple2<IndexedKey, Seq<Seq<Tuple>>> input) {
+ if( jobConf == null) {
+ initializeJobConf();
+ }
+
+ try {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("GroupPkgFunction in " + input);
+ }
+
+ final PigNullableWritable key = new PigNullableWritable() {
+
+ public Object getValueAsPigType() {
+ IndexedKey keyTuple = input._1();
+ return keyTuple.getKey();
+ }
+ };
+ Object obj = input._2();
+ // XXX this is a hack for Spark 1.1.0: the type is an Array, not Seq
+ Seq<Tuple>[] bags = (Seq<Tuple>[]) obj;
+ int i = 0;
+ List<Iterator<NullableTuple>> tupleIterators = new ArrayList<Iterator<NullableTuple>>();
+ for (int j = 0; j < bags.length; j++) {
+ Seq<Tuple> bag = bags[j];
+ Iterator<Tuple> iterator = JavaConversions
+ .asJavaCollection(bag).iterator();
+ final int index = i;
+ tupleIterators.add(new IteratorTransform<Tuple, NullableTuple>(
+ iterator) {
+ @Override
+ protected NullableTuple transform(Tuple next) {
+ NullableTuple nullableTuple = new NullableTuple(next);
+ nullableTuple.setIndex((byte) index);
+ return nullableTuple;
+ }
+ });
+ ++i;
+ }
+
+
+ pkgOp.setInputs(null);
+ pkgOp.attachInput(key, new IteratorUnion<NullableTuple>(tupleIterators.iterator()));
+ Result result = pkgOp.getNextTuple();
+ if (result == null) {
+ throw new RuntimeException(
+ "Null response found for Package on key: " + key);
+ }
+ Tuple out;
+ switch (result.returnStatus) {
+ case POStatus.STATUS_OK:
+ // (key, {(value)...})
+ out = (Tuple) result.result;
+ break;
+ case POStatus.STATUS_NULL:
+ out = null;
+ break;
+ default:
+ throw new RuntimeException(
+ "Unexpected response code from operator "
+ + pkgOp + " : " + result + " "
+ + result.returnStatus);
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("GroupPkgFunction out " + out);
+ }
+ return out;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+
+ private static class IteratorUnion<T> implements Iterator<T> {
+
+ private final Iterator<Iterator<T>> iterators;
+
+ private Iterator<T> current;
+
+ public IteratorUnion(Iterator<Iterator<T>> iterators) {
+ super();
+ this.iterators = iterators;
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (current != null && current.hasNext()) {
+ return true;
+ } else if (iterators.hasNext()) {
+ current = iterators.next();
+ return hasNext();
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public T next() {
+ return current.next();
+ }
+
+ @Override
+ public void remove() {
+ current.remove();
+ }
+
+ }
+}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PigSecondaryKeyComparatorSpark.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PigSecondaryKeyComparatorSpark.java?rev=1751848&r1=1751847&r2=1751848&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PigSecondaryKeyComparatorSpark.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PigSecondaryKeyComparatorSpark.java Fri Jul 8 02:37:36 2016
@@ -66,7 +66,7 @@ class PigSecondaryKeyComparatorSpark imp
}
}
- private int compareSecondaryKeys(Object o1, Object o2, boolean[] asc) {
+ public static int compareSecondaryKeys(Object o1, Object o2, boolean[] asc) {
int rc = 0;
if (o1 != null && o2 != null && o1 instanceof Tuple && o2 instanceof Tuple) {
// objects are Tuples, we may need to apply sort order inside them
Added: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POJoinGroupSpark.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POJoinGroupSpark.java?rev=1751848&view=auto
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POJoinGroupSpark.java (added)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POJoinGroupSpark.java Fri Jul 8 02:37:36 2016
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.operator;
+
+import java.util.List;
+
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.plan.VisitorException;
+
+/**
+ * Collapse POLocalRearrange,POGlobalRearrange and POPackage to POJoinGroupSpark to reduce unnecessary map operations in the join/group
+ */
+public class POJoinGroupSpark extends PhysicalOperator {
+ private List<POLocalRearrange> lraOps;
+ private POGlobalRearrangeSpark glaOp;
+ private POPackage pkgOp;
+ private List<PhysicalOperator> predecessors;
+
+ public POJoinGroupSpark(List<POLocalRearrange> lraOps, POGlobalRearrangeSpark glaOp, POPackage pkgOp){
+ super(glaOp.getOperatorKey());
+ this.lraOps = lraOps;
+ this.glaOp = glaOp;
+ this.pkgOp = pkgOp;
+ }
+
+ public List<POLocalRearrange> getLraOps() {
+ return lraOps;
+ }
+
+ public POGlobalRearrangeSpark getGlaOp() {
+ return glaOp;
+ }
+
+ public POPackage getPkgOp() {
+ return pkgOp;
+ }
+
+ @Override
+ public void visit(PhyPlanVisitor v) throws VisitorException {
+
+ }
+
+ @Override
+ public boolean supportsMultipleInputs() {
+ return true;
+ }
+
+ @Override
+ public boolean supportsMultipleOutputs() {
+ return false;
+ }
+
+ @Override
+ public String name() {
+ return getAliasString() + "POJoinGroupSpark"+ "["
+ + DataType.findTypeName(resultType) + "]" + " - "
+ + mKey.toString();
+ }
+
+ @Override
+ public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+ return null;
+ }
+
+ public void setPredecessors(List<PhysicalOperator> predecessors) {
+ this.predecessors = predecessors;
+ }
+
+ public List<PhysicalOperator> getPredecessors() {
+ return predecessors;
+ }
+}
Added: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/JoinGroupOptimizerSpark.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/JoinGroupOptimizerSpark.java?rev=1751848&view=auto
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/JoinGroupOptimizerSpark.java (added)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/JoinGroupOptimizerSpark.java Fri Jul 8 02:37:36 2016
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.optimizer;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POGlobalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
+import org.apache.pig.backend.hadoop.executionengine.spark.operator.POGlobalRearrangeSpark;
+import org.apache.pig.backend.hadoop.executionengine.spark.operator.POJoinGroupSpark;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOpPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperPlan;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
+import org.apache.pig.impl.plan.DependencyOrderWalker;
+import org.apache.pig.impl.plan.DepthFirstWalker;
+import org.apache.pig.impl.plan.PlanException;
+import org.apache.pig.impl.plan.VisitorException;
+
+/**
+ * Collapse LocalRearrange,GlobalRearrange,Package to POJoinGroupSpark to reduce unnecessary
+ * map operations to optimize join/group. Detail see PIG-4797
+ */
+public class JoinGroupOptimizerSpark extends SparkOpPlanVisitor {
+ private static final Log LOG = LogFactory.getLog(JoinGroupOptimizerSpark.class);
+
+ public JoinGroupOptimizerSpark(SparkOperPlan plan) {
+ super(plan, new DependencyOrderWalker<SparkOperator, SparkOperPlan>(plan, true));
+ }
+
+ @Override
+ public void visitSparkOp(SparkOperator sparkOp) throws VisitorException {
+ if (sparkOp.physicalPlan != null) {
+ GlobalRearrangeDiscover glrDiscover = new GlobalRearrangeDiscover(sparkOp.physicalPlan);
+ glrDiscover.visit();
+ List<PhysicalPlan> plans = glrDiscover.getPlansWithJoinAndGroup();
+ handlePlans(plans);
+ }
+
+ }
+
+ private void handlePlans(List<PhysicalPlan> plans) throws VisitorException {
+ for(int i=0;i<plans.size();i++){
+ PhysicalPlan planWithJoinAndGroup = plans.get(i);
+ POGlobalRearrangeSpark glrSpark = PlanHelper.getPhysicalOperators(planWithJoinAndGroup,POGlobalRearrangeSpark.class).get(0);
+ if (verifyJoinOrGroupCase(plans.get(i), glrSpark)) {
+ try {
+ restructSparkOp(planWithJoinAndGroup, glrSpark);
+ } catch (PlanException e) {
+ throw new RuntimeException("GlobalRearrangeDiscover#visitSparkOp fails: ", e);
+ }
+ }
+ }
+ }
+
+ static class GlobalRearrangeDiscover extends PhyPlanVisitor {
+ private List<PhysicalPlan> plansWithJoinAndGroup = new ArrayList<PhysicalPlan>();
+ public GlobalRearrangeDiscover(PhysicalPlan plan) {
+ super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(
+ plan));
+ }
+
+ @Override
+ public void visitGlobalRearrange(POGlobalRearrange glr) throws VisitorException {
+ PhysicalPlan currentPlan = this.mCurrentWalker.getPlan();//If there are POSplit, we need traverse the POSplit.getPlans(), so use mCurrentWalker.getPlan()
+ if( currentPlan != null) {
+ plansWithJoinAndGroup.add(currentPlan);
+ }else{
+ LOG.info("GlobalRearrangeDiscover#currentPlan is null");
+ }
+
+ }
+
+ public List<PhysicalPlan> getPlansWithJoinAndGroup() {
+ return plansWithJoinAndGroup;
+ }
+ }
+
+ //collapse LRA,GRA,PKG to POJoinGroupSpark
+ private void restructSparkOp(PhysicalPlan plan,POGlobalRearrangeSpark glaOp) throws PlanException {
+
+ List<PhysicalOperator> predes = plan.getPredecessors(glaOp);
+ if (predes != null) {
+ List<POLocalRearrange> lraOps = new ArrayList<POLocalRearrange>();
+ List<PhysicalOperator> allPredsOfLRA = new ArrayList<PhysicalOperator>();
+
+ //Get the predecessors of POJoinGroupSpark with correct order after JoinOptimizationSpark
+ //For other PhysicalOperator, we usually use OperatorPlan#getPredecessor(op) to get predecessors and sort predecessors[JobGraphBuilder#getPredecessors] to
+ //get the predecessor with correct order(in common case, PhysicalOperator
+ //with small OperatorKey must be executed before that with bigger OperatorKey),but this is not suitable for POJoinGroupSpark
+ //Give an example to explain this:
+ //original:
+ //POLOAD(scope-1) POLOAD(scope-2)
+ // \ /
+ // POFOREach(scope-3) POLocalRearrange(scope-5)
+ // \ /
+ // POLocalRearrange(scope-4) POLocalRearrange(scope-5)
+ // \ /
+ // POGlobalRearrange(scope-6)
+ // |
+ // POPackage(scope-7)
+ //after JoinOptimizationSpark:
+ //POLOAD(scope-1) POLOAD(scope-2)
+ // \ /
+ // POFOREach(scope-3) /
+ // \ /
+ // POJoinGroupSpark(scope-8)
+
+ //the predecessor of POJoinGroupSpark(scope-8) is POForEach(scope-3) and POLoad(scope-2) because they are
+ //the predecessor of POLocalRearrange(scope-4) and POLocalRearrange(scope-5) while we will get
+ //will be POLoad(scope-2) and POForEach(scope-3) if use OperatorPlan#getPredecessor(op)to gain predecessors and sort predecessors
+ Collections.sort(predes);
+ for (PhysicalOperator lra : predes) {
+ lraOps.add((POLocalRearrange) lra);
+ List<PhysicalOperator> predOfLRAList = plan.getPredecessors(lra);
+ if( predOfLRAList != null && predOfLRAList.size() ==1) {
+ PhysicalOperator predOfLRA = predOfLRAList.get(0);
+ plan.disconnect(predOfLRA, lra);
+ allPredsOfLRA.add(predOfLRA);
+ }
+ }
+
+ POPackage pkgOp = (POPackage) plan.getSuccessors(glaOp).get(0);
+ PhysicalOperator pkgSuccessor = plan.getSuccessors(pkgOp).get(0);
+ POJoinGroupSpark joinSpark = new POJoinGroupSpark(lraOps, glaOp, pkgOp);
+ if(allPredsOfLRA.size()>0) {
+ joinSpark.setPredecessors(allPredsOfLRA);
+ }
+ plan.add(joinSpark);
+
+ for (PhysicalOperator predOfLRA : allPredsOfLRA) {
+ plan.connect(predOfLRA, joinSpark);
+ }
+
+ plan.disconnect(pkgOp, pkgSuccessor);
+ plan.connect(joinSpark, pkgSuccessor);
+ for (POLocalRearrange lra : lraOps) {
+ plan.remove(lra);
+ }
+ plan.remove(glaOp);
+ plan.remove(pkgOp);
+ }
+ }
+
+ private boolean verifyJoinOrGroupCase(PhysicalPlan plan, POGlobalRearrangeSpark glaOp) {
+ List<PhysicalOperator> lraOps = plan.getPredecessors(glaOp);
+ List<PhysicalOperator> pkgOps = plan.getSuccessors(glaOp);
+ boolean isAllPredecessorLRA = isAllPredecessorLRA(lraOps);
+ boolean isSuccessorPKG = isSuccessorPKG(pkgOps);
+ return isAllPredecessorLRA && isSuccessorPKG;
+ }
+
+ private boolean isSuccessorPKG(List<PhysicalOperator> pkgOps) {
+ boolean result = false;
+ if (pkgOps != null && (pkgOps.size() == 1)) {
+ if (pkgOps.get(0) instanceof POPackage) {
+ result = true;
+ }
+ } else {
+ result = false;
+ }
+
+
+ return result;
+ }
+
+ private boolean isAllPredecessorLRA(List<PhysicalOperator> lraOps) {
+ boolean result = true;
+ if (lraOps != null) {
+ for (PhysicalOperator lraOp : lraOps) {
+ if (!(lraOp instanceof POLocalRearrange)) {
+ result = false;
+ break;
+ }
+ }
+ } else {
+ result = false;
+ }
+
+ return result;
+ }
+}