You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by xu...@apache.org on 2016/10/17 18:18:43 UTC
svn commit: r1765346 - in
/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter:
JoinGroupSparkConverter.java LocalRearrangeFunction.java
ReduceByConverter.java
Author: xuefu
Date: Mon Oct 17 18:18:43 2016
New Revision: 1765346
URL: http://svn.apache.org/viewvc?rev=1765346&view=rev
Log:
PIG-4969: Optimize combine case for spark mode (Part 2) (Liyun via Xuefu)
Removed:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LocalRearrangeFunction.java
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/JoinGroupSparkConverter.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ReduceByConverter.java
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/JoinGroupSparkConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/JoinGroupSparkConverter.java?rev=1765346&r1=1765345&r2=1765346&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/JoinGroupSparkConverter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/JoinGroupSparkConverter.java Mon Oct 17 18:18:43 2016
@@ -17,13 +17,13 @@
*/
package org.apache.pig.backend.hadoop.executionengine.spark.converter;
-
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
+import org.apache.pig.backend.executionengine.ExecException;
import scala.Product2;
import scala.Tuple2;
import scala.collection.JavaConversions;
@@ -44,7 +44,7 @@ import org.apache.pig.impl.io.PigNullabl
import org.apache.spark.api.java.function.Function;
import org.apache.spark.rdd.CoGroupedRDD;
import org.apache.spark.rdd.RDD;
-
+import scala.runtime.AbstractFunction1;
public class JoinGroupSparkConverter implements RDDConverter<Tuple, Tuple, POJoinGroupSpark> {
private static final Log LOG = LogFactory
@@ -63,8 +63,7 @@ public class JoinGroupSparkConverter imp
for (int i = 0; i < predecessors.size(); i++) {
RDD<Tuple> rdd = predecessors.get(i);
- rddAfterLRA.add(rdd.map(new LocalRearrangeFunction(lraOps.get(i), glaOp.isUseSecondaryKey(), glaOp
- .getSecondarySortOrder()),
+ rddAfterLRA.add(rdd.map(new LocalRearrangeFunction(lraOps.get(i), glaOp),
SparkUtil.<IndexedKey, Tuple>getTuple2Manifest()));
}
if (rddAfterLRA.size() == 1 && useSecondaryKey) {
@@ -83,6 +82,67 @@ public class JoinGroupSparkConverter imp
}
}
+ private static class LocalRearrangeFunction extends
+ AbstractFunction1<Tuple, Tuple2<IndexedKey, Tuple>> implements Serializable {
+
+ private final POLocalRearrange lra;
+
+ private boolean useSecondaryKey;
+ private boolean[] secondarySortOrder;
+
+ public LocalRearrangeFunction(POLocalRearrange lra, POGlobalRearrangeSpark glaOp) {
+ if( glaOp.isUseSecondaryKey()) {
+ this.useSecondaryKey = glaOp.isUseSecondaryKey();
+ this.secondarySortOrder = glaOp.getSecondarySortOrder();
+ }
+ this.lra = lra;
+ }
+
+ @Override
+ public Tuple2<IndexedKey, Tuple> apply(Tuple t) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("LocalRearrangeFunction in " + t);
+ }
+ Result result;
+ try {
+ lra.setInputs(null);
+ lra.attachInput(t);
+ result = lra.getNextTuple();
+
+ if (result == null) {
+ throw new RuntimeException(
+ "Null response found for LocalRearange on tuple: "
+ + t);
+ }
+
+ switch (result.returnStatus) {
+ case POStatus.STATUS_OK:
+ // (index, key, value without keys)
+ Tuple resultTuple = (Tuple) result.result;
+ Object key = resultTuple.get(1);
+ IndexedKey indexedKey = new IndexedKey((Byte) resultTuple.get(0), key);
+ if( useSecondaryKey) {
+ indexedKey.setUseSecondaryKey(useSecondaryKey);
+ indexedKey.setSecondarySortOrder(secondarySortOrder);
+ }
+ Tuple2<IndexedKey, Tuple> out = new Tuple2<IndexedKey, Tuple>(indexedKey,
+ (Tuple) resultTuple.get(2));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("LocalRearrangeFunction out " + out);
+ }
+ return out;
+ default:
+ throw new RuntimeException(
+ "Unexpected response code from operator "
+ + lra + " : " + result);
+ }
+ } catch (ExecException e) {
+ throw new RuntimeException(
+ "Couldn't do LocalRearange on tuple: " + t, e);
+ }
+ }
+
+ }
/**
* Send cogroup output where each element is {key, bag[]} to PoPackage
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ReduceByConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ReduceByConverter.java?rev=1765346&r1=1765345&r2=1765346&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ReduceByConverter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ReduceByConverter.java Mon Oct 17 18:18:43 2016
@@ -28,7 +28,9 @@ import scala.runtime.AbstractFunction2;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
import org.apache.pig.backend.hadoop.executionengine.spark.operator.POReduceBySpark;
import org.apache.pig.data.DataBag;
@@ -249,4 +251,74 @@ public class ReduceByConverter implement
return packagedTuple;
}
}
+
+ /**
+ * Converts incoming locally rearranged tuple, which is of the form
+ * (index, key, value) into Tuple2<key, Tuple(key, value)>
+ */
+ private static class LocalRearrangeFunction extends
+ AbstractFunction1<Tuple, Tuple2<IndexedKey, Tuple>> implements Serializable {
+
+ private final POLocalRearrange lra;
+
+ private boolean useSecondaryKey;
+ private boolean[] secondarySortOrder;
+
+ public LocalRearrangeFunction(POLocalRearrange lra, boolean useSecondaryKey, boolean[] secondarySortOrder) {
+ if( useSecondaryKey ) {
+ this.useSecondaryKey = useSecondaryKey;
+ this.secondarySortOrder = secondarySortOrder;
+ }
+ this.lra = lra;
+ }
+
+ @Override
+ public Tuple2<IndexedKey, Tuple> apply(Tuple t) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("LocalRearrangeFunction in " + t);
+ }
+ Result result;
+ try {
+ lra.setInputs(null);
+ lra.attachInput(t);
+ result = lra.getNextTuple();
+
+ if (result == null) {
+ throw new RuntimeException(
+ "Null response found for LocalRearange on tuple: "
+ + t);
+ }
+
+ switch (result.returnStatus) {
+ case POStatus.STATUS_OK:
+ // (index, key, Tuple(key, value))
+ Tuple resultTuple = (Tuple) result.result;
+ Object key = resultTuple.get(1);
+ IndexedKey indexedKey = new IndexedKey((Byte) resultTuple.get(0), key);
+ if( useSecondaryKey) {
+ indexedKey.setUseSecondaryKey(useSecondaryKey);
+ indexedKey.setSecondarySortOrder(secondarySortOrder);
+ }
+ Tuple outValue = TupleFactory.getInstance().newTuple();
+ outValue.append(key);
+ outValue.append(resultTuple.get(2));
+ Tuple2<IndexedKey, Tuple> out = new Tuple2<IndexedKey, Tuple>(indexedKey,
+ outValue);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("LocalRearrangeFunction out " + out);
+ }
+ return out;
+ default:
+ throw new RuntimeException(
+ "Unexpected response code from operator "
+ + lra + " : " + result);
+ }
+ } catch (ExecException e) {
+ throw new RuntimeException(
+ "Couldn't do LocalRearange on tuple: " + t, e);
+ }
+ }
+
+ }
+
}