You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by xu...@apache.org on 2016/10/17 18:18:43 UTC

svn commit: r1765346 - in /pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter: JoinGroupSparkConverter.java LocalRearrangeFunction.java ReduceByConverter.java

Author: xuefu
Date: Mon Oct 17 18:18:43 2016
New Revision: 1765346

URL: http://svn.apache.org/viewvc?rev=1765346&view=rev
Log:
PIG-4969: Optimize combine case for spark mode (Part 2) (Liyun via Xuefu)

Removed:
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LocalRearrangeFunction.java
Modified:
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/JoinGroupSparkConverter.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ReduceByConverter.java

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/JoinGroupSparkConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/JoinGroupSparkConverter.java?rev=1765346&r1=1765345&r2=1765346&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/JoinGroupSparkConverter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/JoinGroupSparkConverter.java Mon Oct 17 18:18:43 2016
@@ -17,13 +17,13 @@
  */
 package org.apache.pig.backend.hadoop.executionengine.spark.converter;
 
-
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 
+import org.apache.pig.backend.executionengine.ExecException;
 import scala.Product2;
 import scala.Tuple2;
 import scala.collection.JavaConversions;
@@ -44,7 +44,7 @@ import org.apache.pig.impl.io.PigNullabl
 import org.apache.spark.api.java.function.Function;
 import org.apache.spark.rdd.CoGroupedRDD;
 import org.apache.spark.rdd.RDD;
-
+import scala.runtime.AbstractFunction1;
 
 public class JoinGroupSparkConverter implements RDDConverter<Tuple, Tuple, POJoinGroupSpark> {
     private static final Log LOG = LogFactory
@@ -63,8 +63,7 @@ public class JoinGroupSparkConverter imp
 
         for (int i = 0; i < predecessors.size(); i++) {
             RDD<Tuple> rdd = predecessors.get(i);
-            rddAfterLRA.add(rdd.map(new LocalRearrangeFunction(lraOps.get(i), glaOp.isUseSecondaryKey(), glaOp
-                            .getSecondarySortOrder()),
+            rddAfterLRA.add(rdd.map(new LocalRearrangeFunction(lraOps.get(i), glaOp),
                     SparkUtil.<IndexedKey, Tuple>getTuple2Manifest()));
         }
         if (rddAfterLRA.size() == 1 && useSecondaryKey) {
@@ -83,6 +82,67 @@ public class JoinGroupSparkConverter imp
         }
     }
 
+    private static class LocalRearrangeFunction extends
+            AbstractFunction1<Tuple, Tuple2<IndexedKey, Tuple>> implements Serializable {
+
+        private final POLocalRearrange lra;
+
+        private boolean useSecondaryKey;
+        private boolean[] secondarySortOrder;
+
+        public LocalRearrangeFunction(POLocalRearrange lra, POGlobalRearrangeSpark glaOp) {
+            if( glaOp.isUseSecondaryKey()) {
+                this.useSecondaryKey = glaOp.isUseSecondaryKey();
+                this.secondarySortOrder = glaOp.getSecondarySortOrder();
+            }
+            this.lra = lra;
+        }
+
+        @Override
+        public Tuple2<IndexedKey, Tuple> apply(Tuple t) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("LocalRearrangeFunction in " + t);
+            }
+            Result result;
+            try {
+                lra.setInputs(null);
+                lra.attachInput(t);
+                result = lra.getNextTuple();
+
+                if (result == null) {
+                    throw new RuntimeException(
+                            "Null response found for LocalRearange on tuple: "
+                                    + t);
+                }
+
+                switch (result.returnStatus) {
+                    case POStatus.STATUS_OK:
+                        // (index, key, value without keys)
+                        Tuple resultTuple = (Tuple) result.result;
+                        Object key = resultTuple.get(1);
+                        IndexedKey indexedKey = new IndexedKey((Byte) resultTuple.get(0), key);
+                        if( useSecondaryKey) {
+                            indexedKey.setUseSecondaryKey(useSecondaryKey);
+                            indexedKey.setSecondarySortOrder(secondarySortOrder);
+                        }
+                        Tuple2<IndexedKey, Tuple> out = new Tuple2<IndexedKey, Tuple>(indexedKey,
+                                (Tuple) resultTuple.get(2));
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("LocalRearrangeFunction out " + out);
+                        }
+                        return out;
+                    default:
+                        throw new RuntimeException(
+                                "Unexpected response code from operator "
+                                        + lra + " : " + result);
+                }
+            } catch (ExecException e) {
+                throw new RuntimeException(
+                        "Couldn't do LocalRearange on tuple: " + t, e);
+            }
+        }
+
+    }
 
     /**
      * Send cogroup output where each element is {key, bag[]} to PoPackage

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ReduceByConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ReduceByConverter.java?rev=1765346&r1=1765345&r2=1765346&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ReduceByConverter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ReduceByConverter.java Mon Oct 17 18:18:43 2016
@@ -28,7 +28,9 @@ import scala.runtime.AbstractFunction2;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
 import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
 import org.apache.pig.backend.hadoop.executionengine.spark.operator.POReduceBySpark;
 import org.apache.pig.data.DataBag;
@@ -249,4 +251,74 @@ public class ReduceByConverter implement
             return packagedTuple;
         }
     }
+
+    /**
+     * Converts incoming locally rearranged tuple, which is of the form
+     * (index, key, value) into Tuple2<key, Tuple(key, value)>
+     */
+    private static class LocalRearrangeFunction extends
+            AbstractFunction1<Tuple, Tuple2<IndexedKey, Tuple>> implements Serializable {
+
+        private final POLocalRearrange lra;
+
+        private boolean useSecondaryKey;
+        private boolean[] secondarySortOrder;
+
+        public LocalRearrangeFunction(POLocalRearrange lra, boolean useSecondaryKey, boolean[] secondarySortOrder) {
+            if( useSecondaryKey ) {
+                this.useSecondaryKey = useSecondaryKey;
+                this.secondarySortOrder = secondarySortOrder;
+            }
+            this.lra = lra;
+        }
+
+        @Override
+        public Tuple2<IndexedKey, Tuple> apply(Tuple t) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("LocalRearrangeFunction in " + t);
+            }
+            Result result;
+            try {
+                lra.setInputs(null);
+                lra.attachInput(t);
+                result = lra.getNextTuple();
+
+                if (result == null) {
+                    throw new RuntimeException(
+                            "Null response found for LocalRearange on tuple: "
+                                    + t);
+                }
+
+                switch (result.returnStatus) {
+                    case POStatus.STATUS_OK:
+                        // (index, key, Tuple(key, value))
+                        Tuple resultTuple = (Tuple) result.result;
+                        Object key = resultTuple.get(1);
+                        IndexedKey indexedKey = new IndexedKey((Byte) resultTuple.get(0), key);
+                        if( useSecondaryKey) {
+                            indexedKey.setUseSecondaryKey(useSecondaryKey);
+                            indexedKey.setSecondarySortOrder(secondarySortOrder);
+                        }
+                        Tuple outValue =  TupleFactory.getInstance().newTuple();
+                        outValue.append(key);
+                        outValue.append(resultTuple.get(2));
+                        Tuple2<IndexedKey, Tuple> out = new Tuple2<IndexedKey, Tuple>(indexedKey,
+                               outValue);
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("LocalRearrangeFunction out " + out);
+                        }
+                        return out;
+                    default:
+                        throw new RuntimeException(
+                                "Unexpected response code from operator "
+                                        + lra + " : " + result);
+                }
+            } catch (ExecException e) {
+                throw new RuntimeException(
+                        "Couldn't do LocalRearange on tuple: " + t, e);
+            }
+        }
+
+    }
+
 }