You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pr...@apache.org on 2014/11/10 11:00:15 UTC

svn commit: r1637812 - in /pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark: SparkLauncher.java converter/CounterConverter.java converter/RankConverter.java

Author: praveen
Date: Mon Nov 10 10:00:14 2014
New Revision: 1637812

URL: http://svn.apache.org/r1637812
Log:
PIG-4231: Make rank work with Spark (carlos balduz via praveen)

Added:
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CounterConverter.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/RankConverter.java
Modified:
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1637812&r1=1637811&r2=1637812&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java Mon Nov 10 10:00:14 2014
@@ -23,6 +23,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCounter;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
@@ -31,6 +32,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORank;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSkewedJoin;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
@@ -38,6 +40,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
 import org.apache.pig.backend.hadoop.executionengine.spark.converter.CollectedGroupConverter;
+import org.apache.pig.backend.hadoop.executionengine.spark.converter.CounterConverter;
 import org.apache.pig.backend.hadoop.executionengine.spark.converter.DistinctConverter;
 import org.apache.pig.backend.hadoop.executionengine.spark.converter.FilterConverter;
 import org.apache.pig.backend.hadoop.executionengine.spark.converter.ForEachConverter;
@@ -47,6 +50,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.spark.converter.LocalRearrangeConverter;
 import org.apache.pig.backend.hadoop.executionengine.spark.converter.POConverter;
 import org.apache.pig.backend.hadoop.executionengine.spark.converter.PackageConverter;
+import org.apache.pig.backend.hadoop.executionengine.spark.converter.RankConverter;
 import org.apache.pig.backend.hadoop.executionengine.spark.converter.SkewedJoinConverter;
 import org.apache.pig.backend.hadoop.executionengine.spark.converter.SortConverter;
 import org.apache.pig.backend.hadoop.executionengine.spark.converter.SplitConverter;
@@ -59,7 +63,6 @@ import org.apache.pig.impl.plan.Operator
 import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.tools.pigstats.PigStats;
 import org.apache.pig.tools.pigstats.SparkStats;
-
 import org.apache.spark.rdd.RDD;
 import org.apache.spark.scheduler.JobLogger;
 import org.apache.spark.scheduler.StatsReportListener;
@@ -139,6 +142,8 @@ public class SparkLauncher extends Launc
         convertMap.put(POSplit.class, new SplitConverter());
         convertMap.put(POSkewedJoin.class, new SkewedJoinConverter());
         convertMap.put(POCollectedGroup.class, new CollectedGroupConverter());
+        convertMap.put(POCounter.class, new CounterConverter());
+        convertMap.put(PORank.class, new RankConverter());
 
         Map<OperatorKey, RDD<Tuple>> rdds = new HashMap<OperatorKey, RDD<Tuple>>();
 
@@ -285,4 +290,4 @@ public class SparkLauncher extends Launc
         // TODO Auto-generated method stub
 
     }
-}
+}
\ No newline at end of file

Added: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CounterConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CounterConverter.java?rev=1637812&view=auto
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CounterConverter.java (added)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CounterConverter.java Mon Nov 10 10:00:14 2014
@@ -0,0 +1,120 @@
+package org.apache.pig.backend.hadoop.executionengine.spark.converter;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCounter;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.rdd.RDD;
+
+public class CounterConverter implements POConverter<Tuple, Tuple, POCounter> {
+
+	private static final Log LOG = LogFactory.getLog(CounterConverter.class);
+	
+	@Override
+	public RDD<Tuple> convert(List<RDD<Tuple>> predecessors, 
+			POCounter poCounter) throws IOException {
+		SparkUtil.assertPredecessorSize(predecessors, poCounter, 1);
+        RDD<Tuple> rdd = predecessors.get(0);
+        CounterConverterFunction f = new CounterConverterFunction(poCounter);
+        JavaRDD<Tuple> jRdd = rdd.toJavaRDD().mapPartitionsWithIndex(f, true);
+//        jRdd = jRdd.cache();
+        return jRdd.rdd();
+	}
+	
+	@SuppressWarnings("serial")
+	private static class CounterConverterFunction implements 
+		Function2<Integer, Iterator<Tuple>, Iterator<Tuple>>, Serializable {
+
+		private final POCounter poCounter;
+		private long localCount = 1L;
+		private long sparkCount = 0L;
+		
+		private CounterConverterFunction(POCounter poCounter) {
+			this.poCounter = poCounter;
+		}
+		
+		@Override
+		public Iterator<Tuple> call(Integer index, final 
+				Iterator<Tuple> input) {
+	        Tuple inp = null;
+	        Tuple output = null;
+	        long sizeBag = 0L;
+
+	        List<Tuple> listOutput = new ArrayList<Tuple>();
+	        
+	        try {
+	        	while (input.hasNext()) {
+					inp = input.next();
+					output = TupleFactory.getInstance()
+							.newTuple(inp.getAll().size() + 3);
+					
+					for (int i = 0; i < inp.getAll().size(); i++) {
+						output.set(i + 3, inp.get(i));
+					}
+					
+					if (poCounter.isRowNumber() || poCounter.isDenseRank()) {
+						output.set(2, getLocalCounter());
+						incrementSparkCounter();
+						incrementLocalCounter();
+					} else if (!poCounter.isDenseRank()) {
+						int positionBag = inp.getAll().size()-1;
+						if (inp.getType(positionBag) == DataType.BAG) {
+			                sizeBag = ((org.apache.pig.data.DefaultAbstractBag)
+			                		inp.get(positionBag)).size();
+			            }
+						
+						output.set(2, getLocalCounter());
+		                
+						addToSparkCounter(sizeBag);
+		                addToLocalCounter(sizeBag);
+					}
+					
+					output.set(0, index);
+					output.set(1, getSparkCounter());
+					listOutput.add(output);
+				}
+	        } catch(ExecException e) {
+	        	throw new RuntimeException(e);
+	        }
+			
+					
+			return listOutput.iterator();
+		}
+		
+		private long getLocalCounter() {
+			return localCount;
+		}
+		
+		private long incrementLocalCounter() {
+			return localCount++;
+		}
+		
+		private long addToLocalCounter(long amount) {
+			return localCount += amount;
+		}
+		
+		private long getSparkCounter() {
+			return sparkCount;
+		}
+		
+		private long incrementSparkCounter() {
+			return sparkCount++;
+		}
+		
+		private long addToSparkCounter(long amount) {
+			return sparkCount += amount;
+		}
+	}
+}

Added: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/RankConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/RankConverter.java?rev=1637812&view=auto
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/RankConverter.java (added)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/RankConverter.java Mon Nov 10 10:00:14 2014
@@ -0,0 +1,119 @@
+/**
+ * 
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.converter;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORank;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.rdd.RDD;
+
+import scala.Tuple2;
+
+public class RankConverter implements POConverter<Tuple, Tuple, PORank> {
+
+	private static final Log LOG = LogFactory.getLog(RankConverter.class);
+	
+	@Override
+	public RDD<Tuple> convert(List<RDD<Tuple>> predecessors, PORank poRank)
+			throws IOException {
+		SparkUtil.assertPredecessorSize(predecessors, poRank, 1);
+        RDD<Tuple> rdd = predecessors.get(0);
+		JavaPairRDD<Integer, Long> javaPairRdd = rdd.toJavaRDD()
+				.mapToPair(new ToPairRdd());
+		JavaPairRDD<Integer, Iterable<Long>> groupedByIndex = javaPairRdd
+				.groupByKey();
+		JavaPairRDD<Integer, Long> countsByIndex = groupedByIndex
+				.mapToPair(new IndexCounters());
+		JavaPairRDD<Integer, Long> sortedCountsByIndex = countsByIndex
+				.sortByKey(true);
+		Map<Integer, Long> counts = sortedCountsByIndex.collectAsMap();
+		JavaRDD<Tuple> finalRdd = rdd.toJavaRDD()
+				.map(new RankFunction(new HashMap<Integer, Long>(counts)));
+		return finalRdd.rdd();
+	}
+
+	@SuppressWarnings("serial")
+	private static class ToPairRdd implements 
+		PairFunction<Tuple, Integer, Long>, Serializable {
+
+        @Override
+        public Tuple2<Integer, Long> call(Tuple t) {
+            try {
+                Integer key = (Integer) t.get(0);
+                Long value = (Long) t.get(1);
+                return new Tuple2<Integer, Long>(key, value);
+            } catch (ExecException e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+	
+	@SuppressWarnings("serial")
+	private static class IndexCounters implements 
+		PairFunction<Tuple2<Integer, Iterable<Long>>, Integer, Long>, 
+		Serializable {
+		@Override
+		public Tuple2<Integer, Long> call(Tuple2<Integer, 
+				Iterable<Long>> input) {
+			long lastVaue = 0L;
+			
+			for (Long t : input._2()) {
+				lastVaue = (t > lastVaue) ? t : lastVaue;
+			}
+
+			return new Tuple2<Integer, Long>(input._1(), lastVaue);
+		}
+    }
+	
+	@SuppressWarnings("serial")
+	private static class RankFunction implements Function<Tuple, Tuple>, 
+			Serializable {
+		private final HashMap<Integer, Long> counts;
+		
+		private RankFunction(HashMap<Integer, Long> counts) {
+			this.counts = counts;
+		}
+		
+		@Override
+		public Tuple call(Tuple input) throws Exception {
+			Tuple output = TupleFactory.getInstance()
+					.newTuple(input.getAll().size() - 2);
+			
+			for (int i = 1; i < input.getAll().size() - 2; i ++) {
+				output.set(i, input.get(i+2));
+			}
+			
+			long offset = calculateOffset((Integer) input.get(0));
+			output.set(0, offset + (Long)input.get(2));
+			return output;
+		}
+		
+		private long calculateOffset(Integer index) {
+			long offset = 0;
+			
+			if (index > 0) {
+				for (int i = 0; i < index; i++) {
+					if (counts.containsKey(i)) {
+						offset += counts.get(i);
+					}
+				}
+			}
+			return offset;
+		}
+	}
+}