You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by xu...@apache.org on 2016/01/29 04:45:26 UTC
svn commit: r1727472 - in /pig/branches/spark:
src/org/apache/pig/backend/hadoop/executionengine/spark/
src/org/apache/pig/backend/hadoop/executionengine/spark/converter/
src/org/apache/pig/backend/hadoop/executionengine/spark/operator/
src/org/apache/...
Author: xuefu
Date: Fri Jan 29 03:45:26 2016
New Revision: 1727472
URL: http://svn.apache.org/viewvc?rev=1727472&view=rev
Log:
PIG-4709: Improve performance of GROUPBY operator on Spark (Pallavi via Xuefu)
Added:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PigSecondaryKeyComparatorSpark.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ReduceByConverter.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POReduceBySpark.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/CombinerOptimizer.java
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LocalRearrangeConverter.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/CombinerOptimizerUtil.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/SecondaryKeyOptimizerUtil.java
pig/branches/spark/test/org/apache/pig/test/TestCombiner.java
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1727472&r1=1727471&r2=1727472&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java Fri Jan 29 03:45:26 2016
@@ -45,6 +45,7 @@ import org.apache.pig.PigConfiguration;
import org.apache.pig.PigConstants;
import org.apache.pig.PigException;
import org.apache.pig.backend.BackendException;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.hadoop.executionengine.Launcher;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PhyPlanSetter;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
@@ -60,6 +61,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPreCombinerLocalRearrange;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORank;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSkewedJoin;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
@@ -82,6 +84,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.spark.converter.PackageConverter;
import org.apache.pig.backend.hadoop.executionengine.spark.converter.RDDConverter;
import org.apache.pig.backend.hadoop.executionengine.spark.converter.RankConverter;
+import org.apache.pig.backend.hadoop.executionengine.spark.converter.ReduceByConverter;
import org.apache.pig.backend.hadoop.executionengine.spark.converter.SkewedJoinConverter;
import org.apache.pig.backend.hadoop.executionengine.spark.converter.SortConverter;
import org.apache.pig.backend.hadoop.executionengine.spark.converter.SplitConverter;
@@ -90,7 +93,9 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.spark.converter.UnionConverter;
import org.apache.pig.backend.hadoop.executionengine.spark.operator.NativeSparkOperator;
import org.apache.pig.backend.hadoop.executionengine.spark.operator.POGlobalRearrangeSpark;
+import org.apache.pig.backend.hadoop.executionengine.spark.operator.POReduceBySpark;
import org.apache.pig.backend.hadoop.executionengine.spark.optimizer.AccumulatorOptimizer;
+import org.apache.pig.backend.hadoop.executionengine.spark.optimizer.CombinerOptimizer;
import org.apache.pig.backend.hadoop.executionengine.spark.optimizer.MultiQueryOptimizerSpark;
import org.apache.pig.backend.hadoop.executionengine.spark.optimizer.NoopFilterRemover;
import org.apache.pig.backend.hadoop.executionengine.spark.optimizer.ParallelismSetter;
@@ -206,6 +211,8 @@ public class SparkLauncher extends Launc
convertMap.put(PORank.class, new RankConverter());
convertMap.put(POStream.class, new StreamConverter(confBytes));
convertMap.put(POFRJoin.class, new FRJoinConverter());
+ convertMap.put(POReduceBySpark.class, new ReduceByConverter());
+ convertMap.put(POPreCombinerLocalRearrange.class, new LocalRearrangeConverter());
sparkPlanToRDD(sparkplan, convertMap, sparkStats, jobConf);
cleanUpSparkJob();
@@ -215,14 +222,27 @@ public class SparkLauncher extends Launc
}
private void optimize(PigContext pc, SparkOperPlan plan) throws IOException {
- String prop = pc.getProperties().getProperty(PigConfiguration.PIG_EXEC_NO_SECONDARY_KEY);
- if (!pc.inIllustrator && !("true".equals(prop))) {
+
+ Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties());
+
+ // Should be the first optimizer as it introduces new operators to the plan.
+ boolean noCombiner = conf.getBoolean(PigConfiguration.PIG_EXEC_NO_COMBINER, false);
+ if (!pc.inIllustrator && !noCombiner) {
+ CombinerOptimizer combinerOptimizer = new CombinerOptimizer(plan);
+ combinerOptimizer.visit();
+ if (LOG.isDebugEnabled()) {
+ System.out.println("after combiner optimization:");
+ explain(plan, System.out, "text", true);
+ }
+ }
+
+ boolean noSecondaryKey = conf.getBoolean(PigConfiguration.PIG_EXEC_NO_SECONDARY_KEY, false);
+ if (!pc.inIllustrator && !noSecondaryKey) {
SecondaryKeyOptimizerSpark skOptimizer = new SecondaryKeyOptimizerSpark(plan);
skOptimizer.visit();
}
- boolean isAccum =
- Boolean.valueOf(pc.getProperties().getProperty("opt.accumulator", "true"));
+ boolean isAccum = conf.getBoolean("opt.accumulator", true);
if (isAccum) {
AccumulatorOptimizer accum = new AccumulatorOptimizer(plan);
accum.visit();
@@ -233,8 +253,7 @@ public class SparkLauncher extends Launc
NoopFilterRemover fRem = new NoopFilterRemover(plan);
fRem.visit();
- boolean isMultiQuery =
- Boolean.valueOf(pc.getProperties().getProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "true"));
+ boolean isMultiQuery = conf.getBoolean(PigConfiguration.PIG_OPT_MULTIQUERY, true);
if (LOG.isDebugEnabled()) {
System.out.println("before multiquery optimization:");
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java?rev=1727472&r1=1727471&r2=1727472&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java Fri Jan 29 03:45:26 2016
@@ -20,7 +20,6 @@ package org.apache.pig.backend.hadoop.ex
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
-import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
@@ -35,7 +34,6 @@ import org.apache.commons.logging.LogFac
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
import org.apache.pig.backend.hadoop.executionengine.spark.operator.POGlobalRearrangeSpark;
-import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.spark.HashPartitioner;
@@ -174,82 +172,6 @@ public class GlobalRearrangeConverter im
}
}
- private static class PigSecondaryKeyComparatorSpark implements Comparator, Serializable {
- private static final long serialVersionUID = 1L;
-
- private static boolean[] secondarySortOrder;
-
- public PigSecondaryKeyComparatorSpark(boolean[] pSecondarySortOrder) {
- secondarySortOrder = pSecondarySortOrder;
- }
-
- @Override
- public int compare(Object o1, Object o2) {
- Tuple t1 = (Tuple) o1;
- Tuple t2 = (Tuple) o2;
- try {
- if ((t1.size() < 3) || (t2.size() < 3)) {
- throw new RuntimeException("tuple size must bigger than 3, tuple[0] stands for index, tuple[1]" +
- "stands for the compound key, tuple[3] stands for the value");
- }
- Tuple compoundKey1 = (Tuple) t1.get(1);
- Tuple compoundKey2 = (Tuple) t2.get(1);
- if ((compoundKey1.size() < 2) || (compoundKey2.size() < 2)) {
- throw new RuntimeException("compoundKey size must bigger than, compoundKey[0] stands for firstKey," +
- "compoundKey[1] stands for secondaryKey");
- }
- Object secondaryKey1 = compoundKey1.get(1);
- Object secondaryKey2 = compoundKey2.get(1);
- int res = compareSecondaryKeys(secondaryKey1, secondaryKey2, secondarySortOrder);
- if (LOG.isDebugEnabled()) {
- LOG.debug("t1:" + t1 + "t2:" + t2 + " res:" + res);
- }
- return res;
- } catch (ExecException e) {
- throw new RuntimeException("Fail to get the compoundKey", e);
- }
- }
-
- private int compareSecondaryKeys(Object o1, Object o2, boolean[] asc) {
- int rc = 0;
- if (o1 != null && o2 != null && o1 instanceof Tuple && o2 instanceof Tuple) {
- // objects are Tuples, we may need to apply sort order inside them
- Tuple t1 = (Tuple) o1;
- Tuple t2 = (Tuple) o2;
- int sz1 = t1.size();
- int sz2 = t2.size();
- if (sz2 < sz1) {
- return 1;
- } else if (sz2 > sz1) {
- return -1;
- } else {
- for (int i = 0; i < sz1; i++) {
- try {
- rc = DataType.compare(t1.get(i), t2.get(i));
- if (rc != 0 && asc != null && asc.length > 1 && !asc[i])
- rc *= -1;
- if ((t1.get(i) == null) || (t2.get(i) == null)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("t1.get(i) is:" + t1.get(i) + " t2.get(i) is:" + t2.get(i));
- }
- }
- if (rc != 0) break;
- } catch (ExecException e) {
- throw new RuntimeException("Unable to compare tuples", e);
- }
- }
- }
- } else {
- // objects are NOT Tuples, delegate to DataType.compare()
- rc = DataType.compare(o1, o2);
- }
- // apply sort order for keys that are not tuples or for whole tuples
- if (asc != null && asc.length == 1 && !asc[0])
- rc *= -1;
- return rc;
- }
- }
-
/**
* Function that extract keys from locally rearranged tuples.
*/
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LocalRearrangeConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LocalRearrangeConverter.java?rev=1727472&r1=1727471&r2=1727472&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LocalRearrangeConverter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LocalRearrangeConverter.java Fri Jan 29 03:45:26 2016
@@ -27,21 +27,21 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
import org.apache.pig.data.Tuple;
import org.apache.spark.rdd.RDD;
@SuppressWarnings({ "serial" })
public class LocalRearrangeConverter implements
- RDDConverter<Tuple, Tuple, POLocalRearrange> {
+ RDDConverter<Tuple, Tuple, PhysicalOperator> {
private static final Log LOG = LogFactory
- .getLog(GlobalRearrangeConverter.class);
+ .getLog(LocalRearrangeConverter.class);
@Override
public RDD<Tuple> convert(List<RDD<Tuple>> predecessors,
- POLocalRearrange physicalOperator) throws IOException {
+ PhysicalOperator physicalOperator) throws IOException {
SparkUtil.assertPredecessorSize(predecessors, physicalOperator, 1);
RDD<Tuple> rdd = predecessors.get(0);
// call local rearrange to get key and value
@@ -53,14 +53,17 @@ public class LocalRearrangeConverter imp
private static class LocalRearrangeFunction extends
AbstractFunction1<Tuple, Tuple> implements Serializable {
- private final POLocalRearrange physicalOperator;
+ private final PhysicalOperator physicalOperator;
- public LocalRearrangeFunction(POLocalRearrange physicalOperator) {
+ public LocalRearrangeFunction(PhysicalOperator physicalOperator) {
this.physicalOperator = physicalOperator;
}
@Override
public Tuple apply(Tuple t) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("LocalRearrangeFunction in " + t);
+ }
Result result;
try {
physicalOperator.setInputs(null);
Added: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PigSecondaryKeyComparatorSpark.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PigSecondaryKeyComparatorSpark.java?rev=1727472&view=auto
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PigSecondaryKeyComparatorSpark.java (added)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PigSecondaryKeyComparatorSpark.java Fri Jan 29 03:45:26 2016
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.converter;
+
+import java.io.Serializable;
+import java.util.Comparator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+
+/**
+ * Utility class that handles secondary key for sorting.
+ */
+class PigSecondaryKeyComparatorSpark implements Comparator, Serializable {
+ private static final Log LOG = LogFactory.getLog(PigSecondaryKeyComparatorSpark.class);
+ private static final long serialVersionUID = 1L;
+
+ private static boolean[] secondarySortOrder;
+
+ public PigSecondaryKeyComparatorSpark(boolean[] pSecondarySortOrder) {
+ secondarySortOrder = pSecondarySortOrder;
+ }
+
+ @Override
+ public int compare(Object o1, Object o2) {
+ Tuple t1 = (Tuple) o1;
+ Tuple t2 = (Tuple) o2;
+ try {
+ if ((t1.size() < 3) || (t2.size() < 3)) {
+ throw new RuntimeException("tuple size must bigger than 3, tuple[0] stands for index, tuple[1]" +
+ "stands for the compound key, tuple[3] stands for the value");
+ }
+ Tuple compoundKey1 = (Tuple) t1.get(1);
+ Tuple compoundKey2 = (Tuple) t2.get(1);
+ if ((compoundKey1.size() < 2) || (compoundKey2.size() < 2)) {
+ throw new RuntimeException("compoundKey size must bigger than, compoundKey[0] stands for firstKey," +
+ "compoundKey[1] stands for secondaryKey");
+ }
+ Object secondaryKey1 = compoundKey1.get(1);
+ Object secondaryKey2 = compoundKey2.get(1);
+ int res = compareSecondaryKeys(secondaryKey1, secondaryKey2, secondarySortOrder);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("t1:" + t1 + "t2:" + t2 + " res:" + res);
+ }
+ return res;
+ } catch (ExecException e) {
+ throw new RuntimeException("Fail to get the compoundKey", e);
+ }
+ }
+
+ private int compareSecondaryKeys(Object o1, Object o2, boolean[] asc) {
+ int rc = 0;
+ if (o1 != null && o2 != null && o1 instanceof Tuple && o2 instanceof Tuple) {
+ // objects are Tuples, we may need to apply sort order inside them
+ Tuple t1 = (Tuple) o1;
+ Tuple t2 = (Tuple) o2;
+ int sz1 = t1.size();
+ int sz2 = t2.size();
+ if (sz2 < sz1) {
+ return 1;
+ } else if (sz2 > sz1) {
+ return -1;
+ } else {
+ for (int i = 0; i < sz1; i++) {
+ try {
+ rc = DataType.compare(t1.get(i), t2.get(i));
+ if (rc != 0 && asc != null && asc.length > 1 && !asc[i])
+ rc *= -1;
+ if ((t1.get(i) == null) || (t2.get(i) == null)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("t1.get(i) is:" + t1.get(i) + " t2.get(i) is:" + t2.get(i));
+ }
+ }
+ if (rc != 0) break;
+ } catch (ExecException e) {
+ throw new RuntimeException("Unable to compare tuples", e);
+ }
+ }
+ }
+ } else {
+ // objects are NOT Tuples, delegate to DataType.compare()
+ rc = DataType.compare(o1, o2);
+ }
+ // apply sort order for keys that are not tuples or for whole tuples
+ if (asc != null && asc.length == 1 && !asc[0])
+ rc *= -1;
+ return rc;
+ }
+}
Added: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ReduceByConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ReduceByConverter.java?rev=1727472&view=auto
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ReduceByConverter.java (added)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ReduceByConverter.java Fri Jan 29 03:45:26 2016
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.converter;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+
+import scala.Tuple2;
+import scala.runtime.AbstractFunction1;
+import scala.runtime.AbstractFunction2;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
+import org.apache.pig.backend.hadoop.executionengine.spark.operator.POReduceBySpark;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DefaultBagFactory;
+import org.apache.pig.data.DefaultTuple;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.spark.HashPartitioner;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.rdd.PairRDDFunctions;
+import org.apache.spark.rdd.RDD;
+
+@SuppressWarnings({"serial"})
+public class ReduceByConverter implements RDDConverter<Tuple, Tuple, POReduceBySpark> {
+ private static final Log LOG = LogFactory.getLog(ReduceByConverter.class);
+
+ private static final TupleFactory tf = TupleFactory.getInstance();
+
+ @Override
+ public RDD<Tuple> convert(List<RDD<Tuple>> predecessors, POReduceBySpark op) throws IOException {
+ SparkUtil.assertPredecessorSize(predecessors, op, 1);
+ int parallelism = SparkUtil.getParallelism(predecessors, op);
+
+ RDD<Tuple> rdd = predecessors.get(0);
+
+ JavaRDD<Tuple2<IndexedKey, Tuple>> rddPair;
+ if (op.isUseSecondaryKey()) {
+ rddPair = handleSecondarySort(rdd, op, parallelism);
+ } else {
+ JavaRDD<Tuple> jrdd = JavaRDD.fromRDD(rdd, SparkUtil.getManifest(Tuple.class));
+ rddPair = jrdd.map(new ToKeyValueFunction(op));
+ }
+ PairRDDFunctions<IndexedKey, Tuple> pairRDDFunctions
+ = new PairRDDFunctions<>(rddPair.rdd(),
+ SparkUtil.getManifest(IndexedKey.class),
+ SparkUtil.getManifest(Tuple.class), null);
+
+ RDD<Tuple2<IndexedKey, Tuple>> tupleRDD = pairRDDFunctions.reduceByKey(
+ SparkUtil.getPartitioner(op.getCustomPartitioner(), parallelism),
+ new MergeValuesFunction(op));
+ LOG.debug("Custom Partitioner and parallelims used : " + op.getCustomPartitioner() + ", " + parallelism);
+
+ return tupleRDD.map(new ToTupleFunction(op), SparkUtil.getManifest(Tuple.class));
+ }
+
+ private JavaRDD<Tuple2<IndexedKey, Tuple>> handleSecondarySort(
+ RDD<Tuple> rdd, POReduceBySpark op, int parallelism) {
+
+ RDD<Tuple2<Tuple, Object>> rddPair = rdd.map(new ToKeyNullValueFunction(),
+ SparkUtil.<Tuple, Object>getTuple2Manifest());
+
+ JavaPairRDD<Tuple, Object> pairRDD = new JavaPairRDD<Tuple, Object>(rddPair,
+ SparkUtil.getManifest(Tuple.class),
+ SparkUtil.getManifest(Object.class));
+
+ //first sort the tuple by secondary key if enable useSecondaryKey sort
+ JavaPairRDD<Tuple, Object> sorted = pairRDD.repartitionAndSortWithinPartitions(
+ new HashPartitioner(parallelism),
+ new PigSecondaryKeyComparatorSpark(op.getSecondarySortOrder()));
+ JavaRDD<Tuple> jrdd = sorted.keys();
+ JavaRDD<Tuple2<IndexedKey, Tuple>> jrddPair = jrdd.map(new ToKeyValueFunction(op));
+ return jrddPair;
+ }
+
+ private static class ToKeyNullValueFunction extends
+ AbstractFunction1<Tuple, Tuple2<Tuple, Object>> implements
+ Serializable {
+
+ @Override
+ public Tuple2<Tuple, Object> apply(Tuple t) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("ToKeyNullValueFunction in " + t);
+ }
+
+ Tuple2<Tuple, Object> out = new Tuple2<Tuple, Object>(t, null);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("ToKeyNullValueFunction out " + out);
+ }
+
+ return out;
+ }
+ }
+
+ /**
+ * Converts incoming locally rearranged tuple, which is of the form
+ * (index, key, value) into Tuple2<key, Tuple(key, value)>
+ */
+ private static class ToKeyValueFunction implements
+ Function<Tuple, Tuple2<IndexedKey, Tuple>>, Serializable {
+
+ private POReduceBySpark poReduce = null;
+
+ public ToKeyValueFunction(POReduceBySpark poReduce) {
+ this.poReduce = poReduce;
+ }
+
+ @Override
+ public Tuple2<IndexedKey, Tuple> call(Tuple t) {
+ try {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("ToKeyValueFunction in " + t);
+ }
+
+ Object key;
+ if ((poReduce != null) && (poReduce.isUseSecondaryKey())) {
+ key = ((Tuple) t.get(1)).get(0);
+ } else {
+ key = t.get(1);
+ }
+
+ Tuple tupleWithKey = tf.newTuple();
+ tupleWithKey.append(key);
+ tupleWithKey.append(t.get(2));
+
+ Tuple2<IndexedKey, Tuple> out = new Tuple2<IndexedKey, Tuple>(new IndexedKey((Byte) t.get(0), key), tupleWithKey);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("ToKeyValueFunction out " + out);
+ }
+
+ return out;
+ } catch (ExecException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ /**
+ * Given two input tuples, this function outputs the resultant tuple.
+ * Additionally, it packages the input tuples to ensure the Algebraic Functions can work on them.
+ */
+ private static final class MergeValuesFunction extends AbstractFunction2<Tuple, Tuple, Tuple>
+ implements Serializable {
+ private final POReduceBySpark poReduce;
+
+ public MergeValuesFunction(POReduceBySpark poReduce) {
+ this.poReduce = poReduce;
+ }
+
+ @Override
+ public Tuple apply(Tuple v1, Tuple v2) {
+ LOG.debug("MergeValuesFunction in : " + v1 + " , " + v2);
+ Tuple result = tf.newTuple();
+ DataBag bag = DefaultBagFactory.getInstance().newDefaultBag();
+ Tuple t = new DefaultTuple();
+ try {
+ // Package the input tuples so they can be processed by Algebraic functions.
+ Object key = v1.get(0);
+ bag.add((Tuple) v1.get(1));
+ bag.add((Tuple) v2.get(1));
+ t.append(key);
+ t.append(bag);
+
+ poReduce.getPkgr().attachInput(key, new DataBag[]{(DataBag) t.get(1)}, new boolean[]{true});
+ Tuple packagedTuple = (Tuple) poReduce.getPkgr().getNext().result;
+
+ // Perform the operation
+ LOG.debug("MergeValuesFunction packagedTuple : " + t);
+ poReduce.attachInput(packagedTuple);
+ Result r = poReduce.getNext(poReduce.getResultType());
+
+ // Ensure output is consistent with the output of KeyValueFunction
+ // If we return r.result, the result will be something like this:
+ // (ABC,(2),(3)) - A tuple with key followed by values.
+ // But, we want the result to look like this:
+ // (ABC,((2),(3))) - A tuple with key and a value tuple (containing values).
+ // Hence, the construction of a new value tuple
+ result.append(t.get(0));
+ Tuple valueTuple = tf.newTuple();
+ for (Object o : ((Tuple) r.result).getAll()) {
+ if (!o.equals(key)) {
+ valueTuple.append(o);
+ }
+ }
+ result.append(valueTuple);
+ LOG.debug("MergeValuesFunction out : " + result);
+ return result;
+ } catch (ExecException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ /**
+ * This function transforms the Tuple to ensure it is packaged as per requirements of the Operator's packager.
+ */
+ private static final class ToTupleFunction extends AbstractFunction1<Tuple2<IndexedKey, Tuple>, Tuple>
+ implements Serializable {
+
+ private final POReduceBySpark poReduce;
+
+ public ToTupleFunction(POReduceBySpark poReduce) {
+ this.poReduce = poReduce;
+ }
+
+ @Override
+ public Tuple apply(Tuple2<IndexedKey, Tuple> v1) {
+ LOG.debug("ToTupleFunction in : " + v1);
+ DataBag bag = DefaultBagFactory.getInstance().newDefaultBag();
+ Tuple t = new DefaultTuple();
+ Tuple packagedTuple = null;
+ try {
+ Object key = v1._2().get(0);
+ bag.add((Tuple) v1._2().get(1));
+ t.append(key);
+ t.append(bag);
+ poReduce.getPkgr().attachInput(key, new DataBag[]{(DataBag) t.get(1)}, new boolean[]{true});
+ packagedTuple = (Tuple) poReduce.getPkgr().getNext().result;
+ } catch (ExecException e) {
+ throw new RuntimeException(e);
+ }
+ LOG.debug("ToTupleFunction out : " + packagedTuple);
+ return packagedTuple;
+ }
+ }
+}
Added: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POReduceBySpark.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POReduceBySpark.java?rev=1727472&view=auto
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POReduceBySpark.java (added)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POReduceBySpark.java Fri Jan 29 03:45:26 2016
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.operator;
+
+import java.util.List;
+
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.Packager;
+import org.apache.pig.data.DataType;
+import org.apache.pig.impl.plan.OperatorKey;
+
+/**
+ * ReduceBy operator that maps to Sparks ReduceBy.
+ * Extends ForEach and adds packager, secondary sort and partitioner support.
+ */
+public class POReduceBySpark extends POForEach {
+ private String customPartitioner;
+
+ protected Packager pkgr;
+
+ public POReduceBySpark(OperatorKey k, int rp, List<PhysicalPlan> inp, List<Boolean> isToBeFlattened, Packager pkgr){
+ super(k, rp, inp, isToBeFlattened);
+ this.pkgr = pkgr;
+ }
+
+ public Packager getPkgr() {
+ return pkgr;
+ }
+
+ @Override
+ public String name() {
+ return getAliasString() + "Reduce By" + "(" + getFlatStr() + ")" + "["
+ + DataType.findTypeName(resultType) + "]" + " - "
+ + mKey.toString();
+ }
+
+ protected String getFlatStr() {
+ if(isToBeFlattenedArray ==null) {
+ return "";
+ }
+ StringBuilder sb = new StringBuilder();
+ for (Boolean b : isToBeFlattenedArray) {
+ sb.append(b);
+ sb.append(',');
+ }
+ if(sb.length()>0){
+ sb.deleteCharAt(sb.length()-1);
+ }
+ return sb.toString();
+ }
+
+ // Use secondary key
+ private boolean useSecondaryKey;
+ // Sort order for secondary keys;
+ private boolean[] secondarySortOrder;
+
+ public boolean isUseSecondaryKey() {
+ return useSecondaryKey;
+ }
+
+ public void setUseSecondaryKey(boolean useSecondaryKey) {
+ this.useSecondaryKey = useSecondaryKey;
+ }
+
+ public boolean[] getSecondarySortOrder() {
+ return secondarySortOrder;
+ }
+
+ public void setSecondarySortOrder(boolean[] secondarySortOrder) {
+ this.secondarySortOrder = secondarySortOrder;
+ }
+
+ public String getCustomPartitioner() {
+ return customPartitioner;
+ }
+
+ public void setCustomPartitioner(String customPartitioner) {
+ this.customPartitioner = customPartitioner;
+ }
+}
Added: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/CombinerOptimizer.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/CombinerOptimizer.java?rev=1727472&view=auto
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/CombinerOptimizer.java (added)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/CombinerOptimizer.java Fri Jan 29 03:45:26 2016
@@ -0,0 +1,439 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.optimizer;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.PigException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.CombinerPackager;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POGlobalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPreCombinerLocalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.Packager;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
+import org.apache.pig.backend.hadoop.executionengine.spark.operator.POGlobalRearrangeSpark;
+import org.apache.pig.backend.hadoop.executionengine.spark.operator.POReduceBySpark;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOpPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperPlan;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
+import org.apache.pig.backend.hadoop.executionengine.util.CombinerOptimizerUtil;
+import org.apache.pig.data.DataType;
+import org.apache.pig.impl.plan.DepthFirstWalker;
+import org.apache.pig.impl.plan.PlanException;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.plan.optimizer.OptimizerException;
+import org.apache.pig.impl.util.Pair;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+/**
+ * This class goes through the physical plan are replaces GlobalRearrange with ReduceBy
+ * where there are algebraic operations.
+ */
+public class CombinerOptimizer extends SparkOpPlanVisitor {
+
+ private static Log LOG = LogFactory.getLog(CombinerOptimizer.class);
+
+ public CombinerOptimizer(SparkOperPlan plan) {
+ super(plan, new DepthFirstWalker<>(plan));
+ }
+
+ @Override
+ public void visitSparkOp(SparkOperator sparkOp) throws VisitorException {
+ try {
+ addCombiner(sparkOp.physicalPlan);
+ } catch (PlanException e) {
+ throw new VisitorException(e);
+ }
+ }
+
+ // Checks for algebraic operations and if they exist.
+ // Replaces global rearrange (cogroup) with reduceBy as follows:
+ // Input:
+ // foreach (using algebraicOp)
+ // -> packager
+ // -> globalRearrange
+ // -> localRearrange
+ // Output:
+ // foreach (using algebraicOp.Final)
+ // -> reduceBy (uses algebraicOp.Intermediate)
+ // -> localRearrange
+ // -> foreach (using algebraicOp.Initial)
+ // -> CombinerRearrange
+ private void addCombiner(PhysicalPlan phyPlan) throws VisitorException, PlanException {
+
+ List<PhysicalOperator> leaves = phyPlan.getLeaves();
+ if (leaves == null || leaves.size() != 1) {
+ return;
+ }
+
+ // Ensure there is grouping.
+ List<POGlobalRearrange> glrs = PlanHelper.getPhysicalOperators(phyPlan, POGlobalRearrange.class);
+ if (glrs == null || glrs.size() == 0) {
+ return;
+ }
+ for (POGlobalRearrange glr : glrs) {
+ List<PhysicalOperator> glrSuccessors = phyPlan.getSuccessors(glr);
+ if (glrSuccessors == null || glrSuccessors.isEmpty()) {
+ continue;
+ }
+
+ if (!(glrSuccessors.get(0) instanceof POPackage)) {
+ continue;
+ }
+ POPackage poPackage = (POPackage) glrSuccessors.get(0);
+
+ List<PhysicalOperator> poPackageSuccessors = phyPlan.getSuccessors(poPackage);
+ if (poPackageSuccessors == null || poPackageSuccessors.size() != 1) {
+ continue;
+ }
+ PhysicalOperator successor = poPackageSuccessors.get(0);
+
+ if (successor instanceof POLimit) {
+ // POLimit is acceptable, as long as it has a single foreach as
+ // successor
+ List<PhysicalOperator> limitSucs = phyPlan.getSuccessors(successor);
+ if (limitSucs != null && limitSucs.size() == 1 &&
+ limitSucs.get(0) instanceof POForEach) {
+ // the code below will now further examine the foreach
+ successor = limitSucs.get(0);
+ }
+ }
+ if (successor instanceof POForEach) {
+ POForEach foreach = (POForEach) successor;
+ List<PhysicalOperator> foreachSuccessors = phyPlan.getSuccessors(foreach);
+ // multi-query
+ if (foreachSuccessors == null || foreachSuccessors.size() != 1) {
+ continue;
+ }
+ List<PhysicalPlan> feInners = foreach.getInputPlans();
+
+ // find algebraic operators and also check if the foreach statement
+ // is suitable for combiner use
+ List<Pair<PhysicalOperator, PhysicalPlan>> algebraicOps = findAlgebraicOps(feInners);
+ if (algebraicOps == null || algebraicOps.size() == 0) {
+ // the plan is not combinable or there is nothing to combine
+ // we're done
+ continue;
+ }
+ try {
+ List<PhysicalOperator> glrPredecessors = phyPlan.getPredecessors(glr);
+ if (glrPredecessors == null || glrPredecessors.isEmpty()) {
+ continue;
+ }
+
+ if (!(glrPredecessors.get(0) instanceof POLocalRearrange)) {
+ continue;
+ }
+ LOG.info("Algebraic operations found. Optimizing plan to use combiner.");
+
+ POLocalRearrange rearrange = (POLocalRearrange) glrPredecessors.get(0);
+ PhysicalOperator foreachSuccessor = foreachSuccessors.get(0);
+ // Clone foreach so it can be modified to an operation post-reduce.
+ POForEach postReduceFE = foreach.clone();
+
+ // Trim the global rearrange and the preceeding package.
+ convertToMapSideForEach(phyPlan, poPackage);
+
+ // replace PODistinct->Project[*] with distinct udf (which is Algebraic)
+ for (Pair<PhysicalOperator, PhysicalPlan> op2plan : algebraicOps) {
+ if (!(op2plan.first instanceof PODistinct)) {
+ continue;
+ }
+ CombinerOptimizerUtil.DistinctPatcher distinctPatcher
+ = new CombinerOptimizerUtil.DistinctPatcher(op2plan.second);
+ distinctPatcher.visit();
+ if (distinctPatcher.getDistinct() == null) {
+ int errCode = 2073;
+ String msg = "Problem with replacing distinct operator with distinct built-in function.";
+ throw new PlanException(msg, errCode, PigException.BUG);
+ }
+ op2plan.first = distinctPatcher.getDistinct();
+ }
+
+ // create new map foreach -
+ POForEach mfe = CombinerOptimizerUtil.createForEachWithGrpProj(foreach, poPackage.getPkgr()
+ .getKeyType());
+ Map<PhysicalOperator, Integer> op2newpos = Maps.newHashMap();
+ Integer pos = 1;
+ // create plan for each algebraic udf and add as inner plan in map-foreach
+ for (Pair<PhysicalOperator, PhysicalPlan> op2plan : algebraicOps) {
+ PhysicalPlan udfPlan = CombinerOptimizerUtil.createPlanWithPredecessors(op2plan.first,
+ op2plan.second);
+ mfe.addInputPlan(udfPlan, false);
+ op2newpos.put(op2plan.first, pos++);
+ }
+ CombinerOptimizerUtil.changeFunc(mfe, POUserFunc.INITIAL);
+
+ // since we will only be creating SingleTupleBag as input to
+ // the map foreach, we should flag the POProjects in the map
+ // foreach inner plans to also use SingleTupleBag
+ for (PhysicalPlan mpl : mfe.getInputPlans()) {
+ try {
+ new CombinerOptimizerUtil.fixMapProjects(mpl).visit();
+ } catch (VisitorException e) {
+ int errCode = 2089;
+ String msg = "Unable to flag project operator to use single tuple bag.";
+ throw new PlanException(msg, errCode, PigException.BUG, e);
+ }
+ }
+
+ // create new combine foreach
+ POForEach cfe = CombinerOptimizerUtil.createForEachWithGrpProj(foreach, poPackage.getPkgr()
+ .getKeyType());
+ // add algebraic functions with appropriate projection
+ CombinerOptimizerUtil.addAlgebraicFuncToCombineFE(cfe, op2newpos);
+
+ // we have modified the foreach inner plans - so set them again
+ // for the foreach so that foreach can do any re-initialization
+ // around them.
+ mfe.setInputPlans(mfe.getInputPlans());
+ cfe.setInputPlans(cfe.getInputPlans());
+
+ // tell POCombinerPackage which fields need projected and which
+ // placed in bags. First field is simple project rest need to go
+ // into bags
+ int numFields = algebraicOps.size() + 1; // algebraic funcs + group key
+ boolean[] bags = new boolean[numFields];
+ bags[0] = false;
+ for (int i = 1; i < numFields; i++) {
+ bags[i] = true;
+ }
+
+ // Use the POCombiner package in the combine plan
+ // as it needs to act differently than the regular
+ // package operator.
+ CombinerPackager pkgr = new CombinerPackager(poPackage.getPkgr(), bags);
+ POPackage combinePack = poPackage.clone();
+ combinePack.setPkgr(pkgr);
+
+ // A specialized local rearrange operator will replace
+ // the normal local rearrange in the map plan.
+ POLocalRearrange newRearrange = CombinerOptimizerUtil.getNewRearrange(rearrange);
+ POPreCombinerLocalRearrange combinerLocalRearrange = CombinerOptimizerUtil.getPreCombinerLR
+ (rearrange);
+ phyPlan.replace(rearrange, combinerLocalRearrange);
+
+ // Create a reduceBy operator.
+ POReduceBySpark reduceOperator = new POReduceBySpark(cfe.getOperatorKey(), cfe
+ .getRequestedParallelism(),
+ cfe.getInputPlans(), cfe.getToBeFlattened(), combinePack.getPkgr());
+ reduceOperator.setCustomPartitioner(glr.getCustomPartitioner());
+ fixReduceSideFE(postReduceFE, cfe);
+ CombinerOptimizerUtil.changeFunc(reduceOperator, POUserFunc.INTERMEDIATE);
+ updatePackager(reduceOperator, newRearrange);
+
+ // Add the new operators
+ phyPlan.add(reduceOperator);
+ phyPlan.add(newRearrange);
+ phyPlan.add(postReduceFE);
+ // Reconnect as follows :
+ // foreach (using algebraicOp.Final)
+ // -> reduceBy (uses algebraicOp.Intermediate)
+ // -> foreach (using algebraicOp.Initial)
+ phyPlan.disconnect(foreach, foreachSuccessor);
+ phyPlan.connect(foreach, newRearrange);
+ phyPlan.connect(newRearrange, reduceOperator);
+ phyPlan.connect(reduceOperator, postReduceFE);
+ phyPlan.replace(foreach, mfe);
+ phyPlan.connect(postReduceFE, foreachSuccessor);
+
+ } catch (Exception e) {
+ int errCode = 2018;
+ String msg = "Internal error. Unable to introduce the combiner for optimization.";
+ throw new OptimizerException(msg, errCode, PigException.BUG, e);
+ }
+ }
+ }
+ }
+
+ // Modifies the map side of foreach (before reduce).
+ private void convertToMapSideForEach(PhysicalPlan physicalPlan, POPackage poPackage)
+ throws PlanException {
+ LinkedList<PhysicalOperator> operatorsToRemove = new LinkedList<>();
+ for (PhysicalOperator physicalOperator : physicalPlan.getPredecessors(poPackage)) {
+ if (physicalOperator instanceof POGlobalRearrangeSpark) {
+ operatorsToRemove.add(physicalOperator);
+ break;
+ }
+ }
+ // Remove global rearranges preceeding POPackage
+ for (PhysicalOperator po : operatorsToRemove) {
+ physicalPlan.removeAndReconnect(po);
+ }
+ // Remove POPackage itself.
+ physicalPlan.removeAndReconnect(poPackage);
+ }
+
+
+ // TODO: Modify the post reduce plan in case of nested algebraic(ExpressionOperator) or logical operations.
+ private void fixReduceSideFE(POForEach postReduceFE, POForEach cfe) throws PlanException,
+ CloneNotSupportedException {
+ List<PhysicalPlan> plans = cfe.getInputPlans();
+ List<PhysicalPlan> newPlans = new ArrayList<>();
+ for (int i = 0; i < plans.size(); i++) {
+ PhysicalPlan inputPlan = plans.get(i).clone();
+ newPlans.add(inputPlan);
+ }
+ postReduceFE.setInputPlans(newPlans);
+ CombinerOptimizerUtil.changeFunc(postReduceFE, POUserFunc.FINAL);
+ postReduceFE.setResultType(DataType.TUPLE);
+ }
+
+ // Update the ReduceBy Operator with the packaging used by Local rearrange.
+ private void updatePackager(POReduceBySpark reduceOperator, POLocalRearrange lrearrange) throws OptimizerException {
+ Packager pkgr = reduceOperator.getPkgr();
+ // annotate the package with information from the LORearrange
+ // update the keyInfo information if already present in the POPackage
+ Map<Integer, Pair<Boolean, Map<Integer, Integer>>> keyInfo = pkgr.getKeyInfo();
+ if (keyInfo == null)
+ keyInfo = new HashMap<Integer, Pair<Boolean, Map<Integer, Integer>>>();
+
+ if (keyInfo.get(Integer.valueOf(lrearrange.getIndex())) != null) {
+ // something is wrong - we should not be getting key info
+ // for the same index from two different Local Rearranges
+ int errCode = 2087;
+ String msg = "Unexpected problem during optimization." +
+ " Found index:" + lrearrange.getIndex() +
+ " in multiple LocalRearrange operators.";
+ throw new OptimizerException(msg, errCode, PigException.BUG);
+
+ }
+ keyInfo.put(Integer.valueOf(lrearrange.getIndex()),
+ new Pair<Boolean, Map<Integer, Integer>>(
+ lrearrange.isProjectStar(), lrearrange.getProjectedColsMap()));
+ pkgr.setKeyInfo(keyInfo);
+ pkgr.setKeyTuple(lrearrange.isKeyTuple());
+ pkgr.setKeyCompound(lrearrange.isKeyCompound());
+ }
+
+ /**
+ * find algebraic operators and also check if the foreach statement is
+ * suitable for combiner use
+ *
+ * @param feInners inner plans of foreach
+ * @return null if plan is not combinable, otherwise list of combinable operators
+ * @throws VisitorException
+ */
+ // TODO : Since all combinable cases are not handled, not using the utility method in CombinerOptimizerUtil
+ private static List<Pair<PhysicalOperator, PhysicalPlan>> findAlgebraicOps(List<PhysicalPlan> feInners)
+ throws VisitorException {
+ List<Pair<PhysicalOperator, PhysicalPlan>> algebraicOps = Lists.newArrayList();
+
+ // check each foreach inner plan
+ for (PhysicalPlan pplan : feInners) {
+ // check for presence of non combinable operators
+ CombinerOptimizerUtil.AlgebraicPlanChecker algChecker = new CombinerOptimizerUtil.AlgebraicPlanChecker
+ (pplan);
+ algChecker.visit();
+ if (algChecker.sawNonAlgebraic) {
+ return null;
+ }
+
+ // TODO : Distinct is combinable. Handle it.
+ if (algChecker.sawDistinctAgg) {
+ return null;
+ }
+
+ List<PhysicalOperator> roots = pplan.getRoots();
+ // combinable operators have to be attached to POProject root(s)
+ for (PhysicalOperator root : roots) {
+ if (root instanceof ConstantExpression) {
+ continue;
+ }
+ if (!(root instanceof POProject)) {
+ // how can this happen? - expect root of inner plan to be
+ // constant or project. not combining it
+ return null;
+ }
+ POProject proj = (POProject) root;
+ POUserFunc combineUdf = getAlgebraicSuccessor(pplan);
+ if (combineUdf == null) {
+ if (proj.isProjectToEnd()) {
+ // project-star or project to end
+ // not combinable
+ return null;
+ }
+ // Check to see if this is a projection of the grouping column.
+ // If so, it will be a projection of col 0
+ List<Integer> cols = proj.getColumns();
+ if (cols != null && cols.size() == 1 && cols.get(0) == 0) {
+ // it is project of grouping column, so the plan is
+ // still combinable
+ continue;
+ } else {
+ //not combinable
+ return null;
+ }
+ }
+
+ // The algebraic udf can have more than one input. Add the udf only once
+ boolean exist = false;
+ for (Pair<PhysicalOperator, PhysicalPlan> pair : algebraicOps) {
+ if (pair.first.equals(combineUdf)) {
+ exist = true;
+ break;
+ }
+ }
+ if (!exist)
+ algebraicOps.add(new Pair<PhysicalOperator, PhysicalPlan>(combineUdf, pplan));
+ }
+ }
+
+ return algebraicOps;
+ }
+
+ /**
+ * Look for a algebraic POUserFunc that is the leaf of an input plan.
+ *
+ * @param pplan physical plan
+ * @return null if any operator other POProject or non-algebraic POUserFunc is
+ * found while going down the plan, otherwise algebraic POUserFunc is returned
+ */
+ private static POUserFunc getAlgebraicSuccessor(PhysicalPlan pplan) {
+ // check if it ends in an UDF
+ List<PhysicalOperator> leaves = pplan.getLeaves();
+ if (leaves == null || leaves.size() != 1) {
+ return null;
+ }
+
+ PhysicalOperator succ = leaves.get(0);
+ if (succ instanceof POUserFunc && ((POUserFunc) succ).combinable()) {
+ return (POUserFunc) succ;
+ }
+
+ // some other operator ? can't combine
+ return null;
+ }
+}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/CombinerOptimizerUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/CombinerOptimizerUtil.java?rev=1727472&r1=1727471&r2=1727472&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/CombinerOptimizerUtil.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/CombinerOptimizerUtil.java Fri Jan 29 03:45:26 2016
@@ -430,7 +430,7 @@ public class CombinerOptimizerUtil {
* @param keyType type for group-by key
* @return new POForeach
*/
- private static POForEach createForEachWithGrpProj(POForEach foreach, byte keyType) {
+ public static POForEach createForEachWithGrpProj(POForEach foreach, byte keyType) {
String scope = foreach.getOperatorKey().scope;
POForEach newFE = new POForEach(createOperatorKey(scope), new ArrayList<PhysicalPlan>());
newFE.addOriginalLocation(foreach.getAlias(), foreach.getOriginalLocations());
@@ -454,7 +454,7 @@ public class CombinerOptimizerUtil {
* @throws CloneNotSupportedException
* @throws PlanException
*/
- private static PhysicalPlan createPlanWithPredecessors(PhysicalOperator algeOp, PhysicalPlan pplan)
+ public static PhysicalPlan createPlanWithPredecessors(PhysicalOperator algeOp, PhysicalPlan pplan)
throws CloneNotSupportedException, PlanException {
PhysicalPlan newplan = new PhysicalPlan();
addPredecessorsToPlan(algeOp, pplan, newplan);
@@ -491,7 +491,7 @@ public class CombinerOptimizerUtil {
* @throws CloneNotSupportedException
* @throws PlanException
*/
- private static void addAlgebraicFuncToCombineFE(POForEach cfe, Map<PhysicalOperator, Integer> op2newpos)
+ public static void addAlgebraicFuncToCombineFE(POForEach cfe, Map<PhysicalOperator, Integer> op2newpos)
throws CloneNotSupportedException, PlanException {
// an array that we will first populate with physical operators in order
// of their position in input. Used while adding plans to combine
@@ -561,7 +561,7 @@ public class CombinerOptimizerUtil {
* @param rearrange
* @return
*/
- private static POPreCombinerLocalRearrange getPreCombinerLR(POLocalRearrange rearrange) {
+ public static POPreCombinerLocalRearrange getPreCombinerLR(POLocalRearrange rearrange) {
String scope = rearrange.getOperatorKey().scope;
POPreCombinerLocalRearrange pclr = new POPreCombinerLocalRearrange(
createOperatorKey(scope),
@@ -602,7 +602,7 @@ public class CombinerOptimizerUtil {
* @param type
* @throws PlanException
*/
- private static void changeFunc(POForEach fe, byte type) throws PlanException {
+ public static void changeFunc(POForEach fe, byte type) throws PlanException {
for (PhysicalPlan plan : fe.getInputPlans()) {
List<PhysicalOperator> leaves = plan.getLeaves();
if (leaves == null || leaves.size() != 1) {
@@ -640,7 +640,7 @@ public class CombinerOptimizerUtil {
* @throws PlanException
* @throws CloneNotSupportedException
*/
- private static POLocalRearrange getNewRearrange(POLocalRearrange rearrange)
+ public static POLocalRearrange getNewRearrange(POLocalRearrange rearrange)
throws PlanException, CloneNotSupportedException {
POLocalRearrange newRearrange = rearrange.clone();
@@ -663,13 +663,13 @@ public class CombinerOptimizerUtil {
* Checks if there is something that prevents the use of algebraic interface,
* and looks for the PODistinct that can be used as algebraic
*/
- private static class AlgebraicPlanChecker extends PhyPlanVisitor {
- boolean sawNonAlgebraic = false;
- boolean sawDistinctAgg = false;
+ public static class AlgebraicPlanChecker extends PhyPlanVisitor {
+ public boolean sawNonAlgebraic = false;
+ public boolean sawDistinctAgg = false;
private boolean sawForeach = false;
private PODistinct distinct = null;
- AlgebraicPlanChecker(PhysicalPlan plan) {
+ public AlgebraicPlanChecker(PhysicalPlan plan) {
super(plan, new DependencyOrderWalker<PhysicalOperator, PhysicalPlan>(plan));
}
@@ -818,7 +818,7 @@ public class CombinerOptimizerUtil {
* with
* POUserFunc(org.apache.pig.builtin.Distinct)[DataBag]
*/
- private static class DistinctPatcher extends PhyPlanVisitor {
+ public static class DistinctPatcher extends PhyPlanVisitor {
private POUserFunc distinct = null;
/**
* @param plan
@@ -884,12 +884,12 @@ public class CombinerOptimizerUtil {
}
}
- POUserFunc getDistinct() {
+ public POUserFunc getDistinct() {
return distinct;
}
}
- private static class fixMapProjects extends PhyPlanVisitor {
+ public static class fixMapProjects extends PhyPlanVisitor {
public fixMapProjects(PhysicalPlan plan) {
this(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(plan));
}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/SecondaryKeyOptimizerUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/SecondaryKeyOptimizerUtil.java?rev=1727472&r1=1727471&r2=1727472&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/SecondaryKeyOptimizerUtil.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/SecondaryKeyOptimizerUtil.java Fri Jan 29 03:45:26 2016
@@ -43,6 +43,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSortedDistinct;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
import org.apache.pig.backend.hadoop.executionengine.spark.operator.POGlobalRearrangeSpark;
+import org.apache.pig.backend.hadoop.executionengine.spark.operator.POReduceBySpark;
import org.apache.pig.classification.InterfaceAudience;
import org.apache.pig.data.DataType;
import org.apache.pig.impl.io.PigNullableWritable;
@@ -432,6 +433,10 @@ public class SecondaryKeyOptimizerUtil {
} else if (root instanceof POPackage) {
POPackage pack = (POPackage) root;
pack.getPkgr().setUseSecondaryKey(true);
+ } else if (root instanceof POReduceBySpark) {
+ POReduceBySpark reduceBySpark = (POReduceBySpark) root;
+ reduceBySpark.setUseSecondaryKey(true);
+ reduceBySpark.setSecondarySortOrder(secondarySortKeyInfo.getAscs());
}
}
return secKeyOptimizerInfo;
Modified: pig/branches/spark/test/org/apache/pig/test/TestCombiner.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestCombiner.java?rev=1727472&r1=1727471&r2=1727472&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestCombiner.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestCombiner.java Fri Jan 29 03:45:26 2016
@@ -118,11 +118,15 @@ public class TestCombiner {
pig.registerQuery("B = group A by ($0, $1);");
pig.registerQuery("C = foreach B generate flatten(group), COUNT($1);");
+ // Since the input has no schema, using Util.getTuplesFromConstantTupleStrings fails assert.
+ List<String> resultTuples = new ArrayList<>();
+ resultTuples.add("(a,b,2)");
+ resultTuples.add("(a,c,1)");
Iterator<Tuple> resultIterator = pig.openIterator("C");
Tuple tuple = resultIterator.next();
- assertEquals("(a,b,2)", tuple.toString());
+ assertTrue(resultTuples.contains(tuple.toString()));
tuple = resultIterator.next();
- assertEquals("(a,c,1)", tuple.toString());
+ assertTrue(resultTuples.contains(tuple.toString()));
return inputFileName;
}
@@ -185,7 +189,7 @@ public class TestCombiner {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
PrintStream ps = new PrintStream(baos);
pigServer.explain("c", ps);
- assertTrue(baos.toString().matches("(?si).*combine plan.*"));
+ checkCombinerUsed(pigServer, "c", true);
Iterator<Tuple> it = pigServer.openIterator("c");
Tuple t = it.next();
@@ -235,7 +239,7 @@ public class TestCombiner {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
PrintStream ps = new PrintStream(baos);
pigServer.explain("c", ps);
- assertTrue(baos.toString().matches("(?si).*combine plan.*"));
+ checkCombinerUsed(pigServer, "c", true);
HashMap<String, Object[]> results = new HashMap<String, Object[]>();
results.put("pig1", new Object[] { "pig1", 3L, 57L, 5.2, 75L, 9.4, 3L, 3L, 57L });
@@ -256,6 +260,56 @@ public class TestCombiner {
}
@Test
+ public void testGroupAndUnion() throws Exception {
+ // test use of combiner when group elements are accessed in the foreach
+ String input1[] = {
+ "ABC\t1\ta\t1",
+ "ABC\t1\tb\t2",
+ "ABC\t1\ta\t3",
+ "ABC\t2\tb\t4",
+ };
+
+ Util.createInputFile(cluster, "testGroupElements1.txt", input1);
+ PigServer pigServer = new PigServer(cluster.getExecType(), properties);
+ pigServer.debugOn();
+ pigServer.registerQuery("a1 = load 'testGroupElements1.txt' " +
+ "as (str:chararray, num1:int, alph : chararray, num2 : int);");
+ pigServer.registerQuery("b1 = group a1 by str;");
+
+ // check if combiner is present or not for various forms of foreach
+ pigServer.registerQuery("c1 = foreach b1 generate flatten(group), COUNT(a1.alph), SUM(a1.num2); ");
+
+ String input2[] = {
+ "DEF\t2\ta\t3",
+ "DEF\t2\td\t5",
+ };
+
+ Util.createInputFile(cluster, "testGroupElements2.txt", input2);
+ pigServer.registerQuery("a2 = load 'testGroupElements2.txt' " +
+ "as (str:chararray, num1:int, alph : chararray, num2 : int);");
+ pigServer.registerQuery("b2 = group a2 by str;");
+
+ // check if combiner is present or not for various forms of foreach
+ pigServer.registerQuery("c2 = foreach b2 generate flatten(group), COUNT(a2.alph), SUM(a2.num2); ");
+
+ pigServer.registerQuery("c = union c1, c2;");
+
+ List<Tuple> expectedRes =
+ Util.getTuplesFromConstantTupleStrings(
+ new String[]{
+ "('ABC',4L,10L)",
+ "('DEF',2L,8L)",
+ });
+
+ Iterator<Tuple> it = pigServer.openIterator("c");
+ Util.checkQueryOutputsAfterSort(it, expectedRes);
+
+ Util.deleteFile(cluster, "testGroupElements1.txt");
+ Util.deleteFile(cluster, "testGroupElements2.txt");
+ pigServer.shutdown();
+ }
+
+ @Test
public void testGroupElements() throws Exception {
// test use of combiner when group elements are accessed in the foreach
String input[] = {
@@ -352,12 +406,12 @@ public class TestCombiner {
pigServer.shutdown();
}
- private void checkCombinerUsed(PigServer pigServer, String string, boolean combineExpected)
+ private void checkCombinerUsed(PigServer pigServer, String variable, boolean combineExpected)
throws IOException {
// make sure there is a combine plan in the explain output
ByteArrayOutputStream baos = new ByteArrayOutputStream();
PrintStream ps = new PrintStream(baos);
- pigServer.explain("c", ps);
+ pigServer.explain(variable, ps);
boolean combinerFound = baos.toString().matches("(?si).*combine plan.*");
System.out.println(baos.toString());
assertEquals("is combiner present as expected", combineExpected, combinerFound);