You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by xu...@apache.org on 2016/09/14 04:01:13 UTC
svn commit: r1760625 - in
/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark:
converter/ operator/ optimizer/
Author: xuefu
Date: Wed Sep 14 04:01:13 2016
New Revision: 1760625
URL: http://svn.apache.org/viewvc?rev=1760625&view=rev
Log:
PIG-4969: Optimize combine case for spark mode (Liyun via Xuefu)
Added:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LocalRearrangeFunction.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SecondaryKeySortUtil.java
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/JoinGroupSparkConverter.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ReduceByConverter.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POReduceBySpark.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/CombinerOptimizer.java
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/JoinGroupSparkConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/JoinGroupSparkConverter.java?rev=1760625&r1=1760624&r2=1760625&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/JoinGroupSparkConverter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/JoinGroupSparkConverter.java Wed Sep 14 04:01:13 2016
@@ -23,17 +23,14 @@ import java.io.Serializable;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
-import java.util.Objects;
import scala.Product2;
import scala.Tuple2;
import scala.collection.JavaConversions;
import scala.collection.Seq;
-import scala.runtime.AbstractFunction1;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
@@ -44,9 +41,6 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.io.NullableTuple;
import org.apache.pig.impl.io.PigNullableWritable;
-import org.apache.spark.Partitioner;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.rdd.CoGroupedRDD;
import org.apache.spark.rdd.RDD;
@@ -69,11 +63,12 @@ public class JoinGroupSparkConverter imp
for (int i = 0; i < predecessors.size(); i++) {
RDD<Tuple> rdd = predecessors.get(i);
- rddAfterLRA.add(rdd.map(new LocalRearrangeFunction(lraOps.get(i), glaOp),
+ rddAfterLRA.add(rdd.map(new LocalRearrangeFunction(lraOps.get(i), glaOp.isUseSecondaryKey(), glaOp
+ .getSecondarySortOrder()),
SparkUtil.<IndexedKey, Tuple>getTuple2Manifest()));
}
if (rddAfterLRA.size() == 1 && useSecondaryKey) {
- return handleSecondarySort(rddAfterLRA.get(0), pkgOp);
+ return SecondaryKeySortUtil.handleSecondarySort(rddAfterLRA.get(0), pkgOp);
} else {
CoGroupedRDD<Object> coGroupedRDD = new CoGroupedRDD<Object>(
@@ -88,218 +83,6 @@ public class JoinGroupSparkConverter imp
}
}
- private RDD<Tuple> handleSecondarySort(
- RDD<Tuple2<IndexedKey, Tuple>> rdd, POPackage pkgOp) {
- JavaPairRDD<IndexedKey, Tuple> pairRDD = JavaPairRDD.fromRDD(rdd, SparkUtil.getManifest(IndexedKey.class),
- SparkUtil.getManifest(Tuple.class));
-
- int partitionNums = pairRDD.partitions().size();
- //repartition to group tuples with same indexedkey to same partition
- JavaPairRDD<IndexedKey, Tuple> sorted = pairRDD.repartitionAndSortWithinPartitions(
- new IndexedKeyPartitioner(partitionNums));
- //Package tuples with same indexedkey as the result: (key,(val1,val2,val3,...))
- return sorted.mapPartitions(new AccumulateByKey(pkgOp), true).rdd();
- }
-
- //Group tuples with same IndexKey into same partition
- private static class IndexedKeyPartitioner extends Partitioner {
- private int partition;
- public IndexedKeyPartitioner(int partition) {
- this.partition = partition;
- }
- @Override
- public int getPartition(Object obj) {
- IndexedKey indexedKey = (IndexedKey) obj;
- Tuple key = (Tuple) indexedKey.getKey();
-
- int hashCode = 0;
- try {
- hashCode = Objects.hashCode(key.get(0));
- } catch (ExecException e) {
- throw new RuntimeException("IndexedKeyPartitioner#getPartition: ", e);
- }
- return Math.abs(hashCode) % partition;
- }
-
- @Override
- public int numPartitions() {
- return partition;
- }
- }
-
- //Package tuples with same indexedkey as the result: (key,(val1,val2,val3,...))
- //Send (key,Iterator) to POPackage, use POPackage#getNextTuple to get the result
- private static class AccumulateByKey implements FlatMapFunction<Iterator<Tuple2<IndexedKey, Tuple>>, Tuple>,
- Serializable {
- private POPackage pkgOp;
-
- public AccumulateByKey(POPackage pkgOp) {
- this.pkgOp = pkgOp;
- }
-
- @Override
- public Iterable<Tuple> call(final Iterator<Tuple2<IndexedKey, Tuple>> it) throws Exception {
- return new Iterable<Tuple>() {
- Object curKey = null;
- ArrayList curValues = new ArrayList();
-
- @Override
- public Iterator<Tuple> iterator() {
- return new Iterator<Tuple>() {
-
- @Override
- public boolean hasNext() {
- return it.hasNext() || curKey != null;
- }
-
- @Override
- public Tuple next() {
- while (it.hasNext()) {
- Tuple2<IndexedKey, Tuple> t = it.next();
- //key changes, restruct the last tuple by curKey, curValues and return
- Object tMainKey = null;
- try {
- tMainKey = ((Tuple) (t._1()).getKey()).get(0);
- if (curKey != null && !curKey.equals(tMainKey)) {
- Tuple result = restructTuple(curKey, new ArrayList(curValues));
- curValues.clear();
- curKey = tMainKey;
- curValues.add(t._2());
- return result;
- }
- curKey = tMainKey;
- //if key does not change, just append the value to the same key
- curValues.add(t._2());
-
- } catch (ExecException e) {
- throw new RuntimeException("AccumulateByKey throw exception: ", e);
- }
- }
- if (curKey == null) {
- throw new RuntimeException("AccumulateByKey curKey is null");
- }
-
- //if we get here, this should be the last record
- Tuple res = restructTuple(curKey, curValues);
- curKey = null;
- return res;
- }
-
-
- @Override
- public void remove() {
- // Not implemented.
- // throw Unsupported Method Invocation Exception.
- throw new UnsupportedOperationException();
- }
- };
- }
- };
- }
-
- private Tuple restructTuple(final Object curKey, final ArrayList<Tuple> curValues) {
- try {
- Tuple retVal = null;
- PigNullableWritable retKey = new PigNullableWritable() {
-
- public Object getValueAsPigType() {
- return curKey;
- }
- };
-
- //Here restruct a tupleIterator, later POPackage#tupIter will use it.
- final Iterator<Tuple> tupleItearator = curValues.iterator();
- Iterator<NullableTuple> iterator = new Iterator<NullableTuple>() {
- public boolean hasNext() {
- return tupleItearator.hasNext();
- }
-
- public NullableTuple next() {
- Tuple t = tupleItearator.next();
- return new NullableTuple(t);
- }
-
- public void remove() {
- throw new UnsupportedOperationException();
- }
- };
- pkgOp.setInputs(null);
- pkgOp.attachInput(retKey, iterator);
- Result res = pkgOp.getNextTuple();
- if (res.returnStatus == POStatus.STATUS_OK) {
- retVal = (Tuple) res.result;
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("AccumulateByKey out: " + retVal);
- }
- return retVal;
- } catch (ExecException e) {
- throw new RuntimeException("AccumulateByKey#restructTuple throws exception: ", e);
- }
- }
- }
-
- private static class LocalRearrangeFunction extends
- AbstractFunction1<Tuple, Tuple2<IndexedKey, Tuple>> implements Serializable {
-
- private final POLocalRearrange lra;
-
- private boolean useSecondaryKey;
- private boolean[] secondarySortOrder;
-
- public LocalRearrangeFunction(POLocalRearrange lra, POGlobalRearrangeSpark glaOp) {
- if( glaOp.isUseSecondaryKey()) {
- this.useSecondaryKey = glaOp.isUseSecondaryKey();
- this.secondarySortOrder = glaOp.getSecondarySortOrder();
- }
- this.lra = lra;
- }
-
- @Override
- public Tuple2<IndexedKey, Tuple> apply(Tuple t) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("LocalRearrangeFunction in " + t);
- }
- Result result;
- try {
- lra.setInputs(null);
- lra.attachInput(t);
- result = lra.getNextTuple();
-
- if (result == null) {
- throw new RuntimeException(
- "Null response found for LocalRearange on tuple: "
- + t);
- }
-
- switch (result.returnStatus) {
- case POStatus.STATUS_OK:
- // (index, key, value without keys)
- Tuple resultTuple = (Tuple) result.result;
- Object key = resultTuple.get(1);
- IndexedKey indexedKey = new IndexedKey((Byte) resultTuple.get(0), key);
- if( useSecondaryKey) {
- indexedKey.setUseSecondaryKey(useSecondaryKey);
- indexedKey.setSecondarySortOrder(secondarySortOrder);
- }
- Tuple2<IndexedKey, Tuple> out = new Tuple2<IndexedKey, Tuple>(indexedKey,
- (Tuple) resultTuple.get(2));
- if (LOG.isDebugEnabled()) {
- LOG.debug("LocalRearrangeFunction out " + out);
- }
- return out;
- default:
- throw new RuntimeException(
- "Unexpected response code from operator "
- + lra + " : " + result);
- }
- } catch (ExecException e) {
- throw new RuntimeException(
- "Couldn't do LocalRearange on tuple: " + t, e);
- }
- }
-
- }
/**
* Send cogroup output where each element is {key, bag[]} to PoPackage
Added: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LocalRearrangeFunction.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LocalRearrangeFunction.java?rev=1760625&view=auto
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LocalRearrangeFunction.java (added)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LocalRearrangeFunction.java Wed Sep 14 04:01:13 2016
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.converter;
+
+import java.io.Serializable;
+
+import scala.Tuple2;
+import scala.runtime.AbstractFunction1;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
+import org.apache.pig.data.Tuple;
+
+/**
+ * Used by JoinGroupSparkConverter and ReduceByConverter to convert incoming locally rearranged tuple, which is of the
+ * form Tuple(index, key, value) into Tuple2<key, Tuple(key, value)>
+ */
+public class LocalRearrangeFunction extends
+ AbstractFunction1<Tuple, Tuple2<IndexedKey, Tuple>> implements Serializable {
+ private static final Log LOG = LogFactory
+ .getLog(LocalRearrangeFunction.class);
+ private final POLocalRearrange lra;
+
+ private boolean useSecondaryKey;
+ private boolean[] secondarySortOrder;
+
+ public LocalRearrangeFunction(POLocalRearrange lra, boolean useSecondaryKey, boolean[] secondarySortOrder) {
+ this.useSecondaryKey = useSecondaryKey;
+ this.secondarySortOrder = secondarySortOrder;
+ this.lra = lra;
+ }
+
+ //in:Tuple(index,key,value)
+ //out:<IndexedKey, value>
+ @Override
+ public Tuple2<IndexedKey, Tuple> apply(Tuple t) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("LocalRearrangeFunction in " + t);
+ }
+ Result result;
+ try {
+ lra.setInputs(null);
+ lra.attachInput(t);
+ result = lra.getNextTuple();
+
+ if (result == null) {
+ throw new RuntimeException(
+ "Null response found for LocalRearange on tuple: "
+ + t);
+ }
+
+ switch (result.returnStatus) {
+ case POStatus.STATUS_OK:
+ // (index, key, value without keys)
+ Tuple resultTuple = (Tuple) result.result;
+ Object key = resultTuple.get(1);
+ IndexedKey indexedKey = new IndexedKey((Byte) resultTuple.get(0), key);
+ if( useSecondaryKey) {
+ indexedKey.setUseSecondaryKey(useSecondaryKey);
+ indexedKey.setSecondarySortOrder(secondarySortOrder);
+ }
+ Tuple2<IndexedKey, Tuple> out = new Tuple2<IndexedKey, Tuple>(indexedKey,
+ (Tuple) resultTuple.get(2));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("LocalRearrangeFunction out " + out);
+ }
+ return out;
+ default:
+ throw new RuntimeException(
+ "Unexpected response code from operator "
+ + lra + " : " + result);
+ }
+ } catch (ExecException e) {
+ throw new RuntimeException(
+ "Couldn't do LocalRearange on tuple: " + t, e);
+ }
+ }
+
+}
+
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ReduceByConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ReduceByConverter.java?rev=1760625&r1=1760624&r2=1760625&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ReduceByConverter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ReduceByConverter.java Wed Sep 14 04:01:13 2016
@@ -55,25 +55,24 @@ public class ReduceByConverter implement
int parallelism = SparkUtil.getParallelism(predecessors, op);
RDD<Tuple> rdd = predecessors.get(0);
-
- JavaRDD<Tuple2<IndexedKey, Tuple>> rddPair;
+ RDD<Tuple2<IndexedKey, Tuple>> rddPair
+ = rdd.map(new LocalRearrangeFunction(op.getLgr(), op.isUseSecondaryKey(), op.getSecondarySortOrder())
+ , SparkUtil.<IndexedKey, Tuple>getTuple2Manifest());
if (op.isUseSecondaryKey()) {
- rddPair = handleSecondarySort(rdd, op, parallelism);
+ return SecondaryKeySortUtil.handleSecondarySort(rddPair, op.getPkg());
} else {
- JavaRDD<Tuple> jrdd = JavaRDD.fromRDD(rdd, SparkUtil.getManifest(Tuple.class));
- rddPair = jrdd.map(new ToKeyValueFunction(op));
- }
- PairRDDFunctions<IndexedKey, Tuple> pairRDDFunctions
- = new PairRDDFunctions<>(rddPair.rdd(),
- SparkUtil.getManifest(IndexedKey.class),
- SparkUtil.getManifest(Tuple.class), null);
-
- RDD<Tuple2<IndexedKey, Tuple>> tupleRDD = pairRDDFunctions.reduceByKey(
- SparkUtil.getPartitioner(op.getCustomPartitioner(), parallelism),
- new MergeValuesFunction(op));
- LOG.debug("Custom Partitioner and parallelims used : " + op.getCustomPartitioner() + ", " + parallelism);
+ PairRDDFunctions<IndexedKey, Tuple> pairRDDFunctions
+ = new PairRDDFunctions<>(rddPair,
+ SparkUtil.getManifest(IndexedKey.class),
+ SparkUtil.getManifest(Tuple.class), null);
+
+ RDD<Tuple2<IndexedKey, Tuple>> tupleRDD = pairRDDFunctions.reduceByKey(
+ SparkUtil.getPartitioner(op.getCustomPartitioner(), parallelism),
+ new MergeValuesFunction(op));
+ LOG.debug("Custom Partitioner and parallelims used : " + op.getCustomPartitioner() + ", " + parallelism);
- return tupleRDD.map(new ToTupleFunction(op), SparkUtil.getManifest(Tuple.class));
+ return tupleRDD.map(new ToTupleFunction(op), SparkUtil.getManifest(Tuple.class));
+ }
}
private JavaRDD<Tuple2<IndexedKey, Tuple>> handleSecondarySort(
@@ -188,8 +187,8 @@ public class ReduceByConverter implement
t.append(key);
t.append(bag);
- poReduce.getPkgr().attachInput(key, new DataBag[]{(DataBag) t.get(1)}, new boolean[]{true});
- Tuple packagedTuple = (Tuple) poReduce.getPkgr().getNext().result;
+ poReduce.getPkg().getPkgr().attachInput(key, new DataBag[]{(DataBag) t.get(1)}, new boolean[]{true});
+ Tuple packagedTuple = (Tuple) poReduce.getPkg().getPkgr().getNext().result;
// Perform the operation
LOG.debug("MergeValuesFunction packagedTuple : " + t);
@@ -241,8 +240,8 @@ public class ReduceByConverter implement
bag.add((Tuple) v1._2().get(1));
t.append(key);
t.append(bag);
- poReduce.getPkgr().attachInput(key, new DataBag[]{(DataBag) t.get(1)}, new boolean[]{true});
- packagedTuple = (Tuple) poReduce.getPkgr().getNext().result;
+ poReduce.getPkg().getPkgr().attachInput(key, new DataBag[]{(DataBag) t.get(1)}, new boolean[]{true});
+ packagedTuple = (Tuple) poReduce.getPkg().getPkgr().getNext().result;
} catch (ExecException e) {
throw new RuntimeException(e);
}
Added: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SecondaryKeySortUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SecondaryKeySortUtil.java?rev=1760625&view=auto
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SecondaryKeySortUtil.java (added)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SecondaryKeySortUtil.java Wed Sep 14 04:01:13 2016
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.converter;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Objects;
+
+import scala.Tuple2;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.NullableTuple;
+import org.apache.pig.impl.io.PigNullableWritable;
+import org.apache.spark.Partitioner;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.rdd.RDD;
+
+/**
+ * Provide utility functions which is used by ReducedByConverter and JoinGroupSparkConverter.
+ */
+public class SecondaryKeySortUtil {
+ private static final Log LOG = LogFactory
+ .getLog(SecondaryKeySortUtil.class);
+
+ public static RDD<Tuple> handleSecondarySort(
+ RDD<Tuple2<IndexedKey, Tuple>> rdd, POPackage pkgOp) {
+ JavaPairRDD<IndexedKey, Tuple> pairRDD = JavaPairRDD.fromRDD(rdd, SparkUtil.getManifest(IndexedKey.class),
+ SparkUtil.getManifest(Tuple.class));
+
+ int partitionNums = pairRDD.partitions().size();
+ //repartition to group tuples with same indexedkey to same partition
+ JavaPairRDD<IndexedKey, Tuple> sorted = pairRDD.repartitionAndSortWithinPartitions(
+ new IndexedKeyPartitioner(partitionNums));
+ //Package tuples with same indexedkey as the result: (key,(val1,val2,val3,...))
+ return sorted.mapPartitions(new AccumulateByKey(pkgOp), true).rdd();
+ }
+
+ //Package tuples with same indexedkey as the result: (key,(val1,val2,val3,...))
+ //Send (key,Iterator) to POPackage, use POPackage#getNextTuple to get the result
+ private static class AccumulateByKey implements FlatMapFunction<Iterator<Tuple2<IndexedKey, Tuple>>, Tuple>,
+ Serializable {
+ private POPackage pkgOp;
+
+ public AccumulateByKey(POPackage pkgOp) {
+ this.pkgOp = pkgOp;
+ }
+
+ @Override
+ public Iterable<Tuple> call(final Iterator<Tuple2<IndexedKey, Tuple>> it) throws Exception {
+ return new Iterable<Tuple>() {
+ Object curKey = null;
+ ArrayList curValues = new ArrayList();
+
+ @Override
+ public Iterator<Tuple> iterator() {
+ return new Iterator<Tuple>() {
+
+ @Override
+ public boolean hasNext() {
+ return it.hasNext() || curKey != null;
+ }
+
+ @Override
+ public Tuple next() {
+ while (it.hasNext()) {
+ Tuple2<IndexedKey, Tuple> t = it.next();
+ //key changes, restruct the last tuple by curKey, curValues and return
+ Object tMainKey = null;
+ try {
+ tMainKey = ((Tuple) (t._1()).getKey()).get(0);
+ if (curKey != null && !curKey.equals(tMainKey)) {
+ Tuple result = restructTuple(curKey, new ArrayList(curValues));
+ curValues.clear();
+ curKey = tMainKey;
+ curValues.add(t._2());
+ return result;
+ }
+ curKey = tMainKey;
+ //if key does not change, just append the value to the same key
+ curValues.add(t._2());
+
+ } catch (ExecException e) {
+ throw new RuntimeException("AccumulateByKey throw exception: ", e);
+ }
+ }
+ if (curKey == null) {
+ throw new RuntimeException("AccumulateByKey curKey is null");
+ }
+
+ //if we get here, this should be the last record
+ Tuple res = restructTuple(curKey, curValues);
+ curKey = null;
+ return res;
+ }
+
+
+ @Override
+ public void remove() {
+ // Not implemented.
+ // throw Unsupported Method Invocation Exception.
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+ };
+ }
+
+ private Tuple restructTuple(final Object curKey, final ArrayList<Tuple> curValues) {
+ try {
+ Tuple retVal = null;
+ PigNullableWritable retKey = new PigNullableWritable() {
+
+ public Object getValueAsPigType() {
+ return curKey;
+ }
+ };
+
+ //Here restruct a tupleIterator, later POPackage#tupIter will use it.
+ final Iterator<Tuple> tupleItearator = curValues.iterator();
+ Iterator<NullableTuple> iterator = new Iterator<NullableTuple>() {
+ public boolean hasNext() {
+ return tupleItearator.hasNext();
+ }
+
+ public NullableTuple next() {
+ Tuple t = tupleItearator.next();
+ return new NullableTuple(t);
+ }
+
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ pkgOp.setInputs(null);
+ pkgOp.attachInput(retKey, iterator);
+ Result res = pkgOp.getNextTuple();
+ if (res.returnStatus == POStatus.STATUS_OK) {
+ retVal = (Tuple) res.result;
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("AccumulateByKey out: " + retVal);
+ }
+ return retVal;
+ } catch (ExecException e) {
+ throw new RuntimeException("AccumulateByKey#restructTuple throws exception: ", e);
+ }
+ }
+ }
+
+ //Group tuples with same IndexKey into same partition
+ private static class IndexedKeyPartitioner extends Partitioner {
+ private int partition;
+
+ public IndexedKeyPartitioner(int partition) {
+ this.partition = partition;
+ }
+
+ @Override
+ public int getPartition(Object obj) {
+ IndexedKey indexedKey = (IndexedKey) obj;
+ Tuple key = (Tuple) indexedKey.getKey();
+
+ int hashCode = 0;
+ try {
+ hashCode = Objects.hashCode(key.get(0));
+ } catch (ExecException e) {
+ throw new RuntimeException("IndexedKeyPartitioner#getPartition: ", e);
+ }
+ return Math.abs(hashCode) % partition;
+ }
+
+ @Override
+ public int numPartitions() {
+ return partition;
+ }
+ }
+}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POReduceBySpark.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POReduceBySpark.java?rev=1760625&r1=1760624&r2=1760625&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POReduceBySpark.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POReduceBySpark.java Wed Sep 14 04:01:13 2016
@@ -21,7 +21,8 @@ import java.util.List;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.Packager;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
import org.apache.pig.data.DataType;
import org.apache.pig.impl.plan.OperatorKey;
@@ -31,16 +32,19 @@ import org.apache.pig.impl.plan.Operator
*/
public class POReduceBySpark extends POForEach {
private String customPartitioner;
+ protected POLocalRearrange lr;
+ protected POPackage pkg;
- protected Packager pkgr;
-
- public POReduceBySpark(OperatorKey k, int rp, List<PhysicalPlan> inp, List<Boolean> isToBeFlattened, Packager pkgr){
+ public POReduceBySpark(OperatorKey k, int rp, List<PhysicalPlan> inp, List<Boolean> isToBeFlattened, POPackage
+ pkg, POLocalRearrange lr){
super(k, rp, inp, isToBeFlattened);
- this.pkgr = pkgr;
+ this.pkg = pkg;
+ this.lr = lr;
+ this.addOriginalLocation(lr.getAlias(), lr.getOriginalLocations());
}
- public Packager getPkgr() {
- return pkgr;
+ public POPackage getPkg() {
+ return pkg;
}
@Override
@@ -93,4 +97,9 @@ public class POReduceBySpark extends POF
public void setCustomPartitioner(String customPartitioner) {
this.customPartitioner = customPartitioner;
}
+
+ public POLocalRearrange getLgr() {
+ return lr;
+ }
+
}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/CombinerOptimizer.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/CombinerOptimizer.java?rev=1760625&r1=1760624&r2=1760625&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/CombinerOptimizer.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/CombinerOptimizer.java Wed Sep 14 04:01:13 2016
@@ -86,7 +86,6 @@ public class CombinerOptimizer extends S
// Output:
// foreach (using algebraicOp.Final)
// -> reduceBy (uses algebraicOp.Intermediate)
- // -> localRearrange
// -> foreach (using algebraicOp.Initial)
// -> CombinerRearrange
private void addCombiner(PhysicalPlan phyPlan) throws VisitorException, PlanException, CloneNotSupportedException {
@@ -251,7 +250,7 @@ public class CombinerOptimizer extends S
// Create a reduceBy operator.
POReduceBySpark reduceOperator = new POReduceBySpark(cfe.getOperatorKey(), cfe
.getRequestedParallelism(),
- cfe.getInputPlans(), cfe.getToBeFlattened(), combinePack.getPkgr());
+ cfe.getInputPlans(), cfe.getToBeFlattened(), combinePack, newRearrange);
reduceOperator.setCustomPartitioner(glr.getCustomPartitioner());
fixReduceSideFE(postReduceFE, algebraicOps);
CombinerOptimizerUtil.changeFunc(reduceOperator, POUserFunc.INTERMEDIATE);
@@ -259,14 +258,11 @@ public class CombinerOptimizer extends S
// Add the new operators
phyPlan.add(reduceOperator);
- phyPlan.add(newRearrange);
phyPlan.add(mfe);
// Connect the new operators as follows:
// reduceBy (using algebraicOp.Intermediate)
- // -> rearrange
// -> foreach (using algebraicOp.Initial)
- phyPlan.connect(mfe, newRearrange);
- phyPlan.connect(newRearrange, reduceOperator);
+ phyPlan.connect(mfe, reduceOperator);
// Insert the reduce stage between combiner rearrange and its successor.
phyPlan.disconnect(combinerLocalRearrange, packageSuccessor);
@@ -329,7 +325,7 @@ public class CombinerOptimizer extends S
// Update the ReduceBy Operator with the packaging used by Local rearrange.
private void updatePackager(POReduceBySpark reduceOperator, POLocalRearrange lrearrange) throws OptimizerException {
- Packager pkgr = reduceOperator.getPkgr();
+ Packager pkgr = reduceOperator.getPkg().getPkgr();
// annotate the package with information from the LORearrange
// update the keyInfo information if already present in the POPackage
Map<Integer, Pair<Boolean, Map<Integer, Integer>>> keyInfo = pkgr.getKeyInfo();