You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pr...@apache.org on 2014/11/10 11:00:15 UTC
svn commit: r1637812 - in
/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark:
SparkLauncher.java converter/CounterConverter.java
converter/RankConverter.java
Author: praveen
Date: Mon Nov 10 10:00:14 2014
New Revision: 1637812
URL: http://svn.apache.org/r1637812
Log:
PIG-4231: Make rank work with Spark (carlos balduz via praveen)
Added:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CounterConverter.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/RankConverter.java
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1637812&r1=1637811&r2=1637812&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java Mon Nov 10 10:00:14 2014
@@ -23,6 +23,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCounter;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
@@ -31,6 +32,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORank;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSkewedJoin;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
@@ -38,6 +40,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
import org.apache.pig.backend.hadoop.executionengine.spark.converter.CollectedGroupConverter;
+import org.apache.pig.backend.hadoop.executionengine.spark.converter.CounterConverter;
import org.apache.pig.backend.hadoop.executionengine.spark.converter.DistinctConverter;
import org.apache.pig.backend.hadoop.executionengine.spark.converter.FilterConverter;
import org.apache.pig.backend.hadoop.executionengine.spark.converter.ForEachConverter;
@@ -47,6 +50,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.spark.converter.LocalRearrangeConverter;
import org.apache.pig.backend.hadoop.executionengine.spark.converter.POConverter;
import org.apache.pig.backend.hadoop.executionengine.spark.converter.PackageConverter;
+import org.apache.pig.backend.hadoop.executionengine.spark.converter.RankConverter;
import org.apache.pig.backend.hadoop.executionengine.spark.converter.SkewedJoinConverter;
import org.apache.pig.backend.hadoop.executionengine.spark.converter.SortConverter;
import org.apache.pig.backend.hadoop.executionengine.spark.converter.SplitConverter;
@@ -59,7 +63,6 @@ import org.apache.pig.impl.plan.Operator
import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.tools.pigstats.PigStats;
import org.apache.pig.tools.pigstats.SparkStats;
-
import org.apache.spark.rdd.RDD;
import org.apache.spark.scheduler.JobLogger;
import org.apache.spark.scheduler.StatsReportListener;
@@ -139,6 +142,8 @@ public class SparkLauncher extends Launc
convertMap.put(POSplit.class, new SplitConverter());
convertMap.put(POSkewedJoin.class, new SkewedJoinConverter());
convertMap.put(POCollectedGroup.class, new CollectedGroupConverter());
+ convertMap.put(POCounter.class, new CounterConverter());
+ convertMap.put(PORank.class, new RankConverter());
Map<OperatorKey, RDD<Tuple>> rdds = new HashMap<OperatorKey, RDD<Tuple>>();
@@ -285,4 +290,4 @@ public class SparkLauncher extends Launc
// TODO Auto-generated method stub
}
-}
+}
\ No newline at end of file
Added: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CounterConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CounterConverter.java?rev=1637812&view=auto
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CounterConverter.java (added)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CounterConverter.java Mon Nov 10 10:00:14 2014
@@ -0,0 +1,120 @@
+package org.apache.pig.backend.hadoop.executionengine.spark.converter;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCounter;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.rdd.RDD;
+
+public class CounterConverter implements POConverter<Tuple, Tuple, POCounter> {
+
+ private static final Log LOG = LogFactory.getLog(CounterConverter.class);
+
+ @Override
+ public RDD<Tuple> convert(List<RDD<Tuple>> predecessors,
+ POCounter poCounter) throws IOException {
+ SparkUtil.assertPredecessorSize(predecessors, poCounter, 1);
+ RDD<Tuple> rdd = predecessors.get(0);
+ CounterConverterFunction f = new CounterConverterFunction(poCounter);
+ JavaRDD<Tuple> jRdd = rdd.toJavaRDD().mapPartitionsWithIndex(f, true);
+// jRdd = jRdd.cache();
+ return jRdd.rdd();
+ }
+
+ @SuppressWarnings("serial")
+ private static class CounterConverterFunction implements
+ Function2<Integer, Iterator<Tuple>, Iterator<Tuple>>, Serializable {
+
+ private final POCounter poCounter;
+ private long localCount = 1L;
+ private long sparkCount = 0L;
+
+ private CounterConverterFunction(POCounter poCounter) {
+ this.poCounter = poCounter;
+ }
+
+ @Override
+ public Iterator<Tuple> call(Integer index, final
+ Iterator<Tuple> input) {
+ Tuple inp = null;
+ Tuple output = null;
+ long sizeBag = 0L;
+
+ List<Tuple> listOutput = new ArrayList<Tuple>();
+
+ try {
+ while (input.hasNext()) {
+ inp = input.next();
+ output = TupleFactory.getInstance()
+ .newTuple(inp.getAll().size() + 3);
+
+ for (int i = 0; i < inp.getAll().size(); i++) {
+ output.set(i + 3, inp.get(i));
+ }
+
+ if (poCounter.isRowNumber() || poCounter.isDenseRank()) {
+ output.set(2, getLocalCounter());
+ incrementSparkCounter();
+ incrementLocalCounter();
+ } else if (!poCounter.isDenseRank()) {
+ int positionBag = inp.getAll().size()-1;
+ if (inp.getType(positionBag) == DataType.BAG) {
+ sizeBag = ((org.apache.pig.data.DefaultAbstractBag)
+ inp.get(positionBag)).size();
+ }
+
+ output.set(2, getLocalCounter());
+
+ addToSparkCounter(sizeBag);
+ addToLocalCounter(sizeBag);
+ }
+
+ output.set(0, index);
+ output.set(1, getSparkCounter());
+ listOutput.add(output);
+ }
+ } catch(ExecException e) {
+ throw new RuntimeException(e);
+ }
+
+
+ return listOutput.iterator();
+ }
+
+ private long getLocalCounter() {
+ return localCount;
+ }
+
+ private long incrementLocalCounter() {
+ return localCount++;
+ }
+
+ private long addToLocalCounter(long amount) {
+ return localCount += amount;
+ }
+
+ private long getSparkCounter() {
+ return sparkCount;
+ }
+
+ private long incrementSparkCounter() {
+ return sparkCount++;
+ }
+
+ private long addToSparkCounter(long amount) {
+ return sparkCount += amount;
+ }
+ }
+}
Added: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/RankConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/RankConverter.java?rev=1637812&view=auto
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/RankConverter.java (added)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/RankConverter.java Mon Nov 10 10:00:14 2014
@@ -0,0 +1,119 @@
+/**
+ *
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.converter;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORank;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.rdd.RDD;
+
+import scala.Tuple2;
+
+public class RankConverter implements POConverter<Tuple, Tuple, PORank> {
+
+ private static final Log LOG = LogFactory.getLog(RankConverter.class);
+
+ @Override
+ public RDD<Tuple> convert(List<RDD<Tuple>> predecessors, PORank poRank)
+ throws IOException {
+ SparkUtil.assertPredecessorSize(predecessors, poRank, 1);
+ RDD<Tuple> rdd = predecessors.get(0);
+ JavaPairRDD<Integer, Long> javaPairRdd = rdd.toJavaRDD()
+ .mapToPair(new ToPairRdd());
+ JavaPairRDD<Integer, Iterable<Long>> groupedByIndex = javaPairRdd
+ .groupByKey();
+ JavaPairRDD<Integer, Long> countsByIndex = groupedByIndex
+ .mapToPair(new IndexCounters());
+ JavaPairRDD<Integer, Long> sortedCountsByIndex = countsByIndex
+ .sortByKey(true);
+ Map<Integer, Long> counts = sortedCountsByIndex.collectAsMap();
+ JavaRDD<Tuple> finalRdd = rdd.toJavaRDD()
+ .map(new RankFunction(new HashMap<Integer, Long>(counts)));
+ return finalRdd.rdd();
+ }
+
+ @SuppressWarnings("serial")
+ private static class ToPairRdd implements
+ PairFunction<Tuple, Integer, Long>, Serializable {
+
+ @Override
+ public Tuple2<Integer, Long> call(Tuple t) {
+ try {
+ Integer key = (Integer) t.get(0);
+ Long value = (Long) t.get(1);
+ return new Tuple2<Integer, Long>(key, value);
+ } catch (ExecException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ @SuppressWarnings("serial")
+ private static class IndexCounters implements
+ PairFunction<Tuple2<Integer, Iterable<Long>>, Integer, Long>,
+ Serializable {
+ @Override
+ public Tuple2<Integer, Long> call(Tuple2<Integer,
+ Iterable<Long>> input) {
+ long lastVaue = 0L;
+
+ for (Long t : input._2()) {
+ lastVaue = (t > lastVaue) ? t : lastVaue;
+ }
+
+ return new Tuple2<Integer, Long>(input._1(), lastVaue);
+ }
+ }
+
+ @SuppressWarnings("serial")
+ private static class RankFunction implements Function<Tuple, Tuple>,
+ Serializable {
+ private final HashMap<Integer, Long> counts;
+
+ private RankFunction(HashMap<Integer, Long> counts) {
+ this.counts = counts;
+ }
+
+ @Override
+ public Tuple call(Tuple input) throws Exception {
+ Tuple output = TupleFactory.getInstance()
+ .newTuple(input.getAll().size() - 2);
+
+ for (int i = 1; i < input.getAll().size() - 2; i ++) {
+ output.set(i, input.get(i+2));
+ }
+
+ long offset = calculateOffset((Integer) input.get(0));
+ output.set(0, offset + (Long)input.get(2));
+ return output;
+ }
+
+ private long calculateOffset(Integer index) {
+ long offset = 0;
+
+ if (index > 0) {
+ for (int i = 0; i < index; i++) {
+ if (counts.containsKey(i)) {
+ offset += counts.get(i);
+ }
+ }
+ }
+ return offset;
+ }
+ }
+}