You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pr...@apache.org on 2014/10/10 13:48:54 UTC
svn commit: r1630768 - in /pig/branches/spark: ./ ivy/
src/org/apache/pig/backend/hadoop/executionengine/spark/converter/
Author: praveen
Date: Fri Oct 10 11:48:53 2014
New Revision: 1630768
URL: http://svn.apache.org/r1630768
Log:
PIG-4173: Upgrading spark version to 1.1.0 (richard ding via praveen)
Modified:
pig/branches/spark/ivy.xml
pig/branches/spark/ivy/libraries.properties
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CollectedGroupConverter.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/DistinctConverter.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ForEachConverter.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LimitConverter.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SkewedJoinConverter.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StoreConverter.java
Modified: pig/branches/spark/ivy.xml
URL: http://svn.apache.org/viewvc/pig/branches/spark/ivy.xml?rev=1630768&r1=1630767&r2=1630768&view=diff
==============================================================================
--- pig/branches/spark/ivy.xml (original)
+++ pig/branches/spark/ivy.xml Fri Oct 10 11:48:53 2014
@@ -435,9 +435,17 @@
<dependency org="com.twitter" name="parquet-pig-bundle" rev="${parquet-pig-bundle.version}" conf="compile->master"/>
<!-- for Spark integration -->
- <dependency org="org.apache.spark" name="spark-core_2.10" rev="0.9.0-incubating" conf="compile->default">
+ <dependency org="org.apache.spark" name="spark-core_2.10" rev="${spark.version}" conf="compile->default">
<exclude org="org.eclipse.jetty.orbit" module="javax.servlet"/>
+ <exclude org="org.eclipse.jetty.orbit" module="javax.transaction"/>
+ <exclude org="org.eclipse.jetty.orbit" module="javax.mail.glassfish"/>
+ <exclude org="org.eclipse.jetty.orbit" module="javax.activation"/>
</dependency>
+ <dependency org="asm" name="asm" rev="${asm.version}" conf="compile->master">
+ </dependency>
+ <dependency org="javax.servlet" name="javax.servlet-api" rev="3.0.1"
+ conf="compile->master"/>
+
<!-- for Tez integration -->
<dependency org="org.apache.tez" name="tez" rev="${tez.version}"
Modified: pig/branches/spark/ivy/libraries.properties
URL: http://svn.apache.org/viewvc/pig/branches/spark/ivy/libraries.properties?rev=1630768&r1=1630767&r2=1630768&view=diff
==============================================================================
--- pig/branches/spark/ivy/libraries.properties (original)
+++ pig/branches/spark/ivy/libraries.properties Fri Oct 10 11:48:53 2014
@@ -17,6 +17,7 @@
accumulo15.version=1.5.0
apacheant.version=1.7.1
apacherat.version=0.8
+asm.version=3.2
automaton.version=1.11-8
avro.version=1.7.5
commons-beanutils.version=1.7.0
@@ -76,6 +77,7 @@ netty.version=3.2.2
rats-lib.version=0.5.1
slf4j-api.version=1.6.1
slf4j-log4j12.version=1.6.1
+spark.version=1.1.0
xerces.version=2.10.0
xalan.version=2.7.1
wagon-http.version=1.0-beta-2
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CollectedGroupConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CollectedGroupConverter.java?rev=1630768&r1=1630767&r2=1630768&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CollectedGroupConverter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CollectedGroupConverter.java Fri Oct 10 11:48:53 2014
@@ -1,21 +1,15 @@
package org.apache.pig.backend.hadoop.executionengine.spark.converter;
import java.io.IOException;
-import java.io.Serializable;
+import java.util.Iterator;
import java.util.List;
-import org.apache.pig.Main;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
-import org.apache.pig.backend.hadoop.executionengine.spark.BroadCastServer;
import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.plan.PlanException;
-
-import scala.collection.Iterator;
-import scala.collection.JavaConversions;
-import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.rdd.RDD;
@@ -23,28 +17,29 @@ import org.apache.spark.rdd.RDD;
public class CollectedGroupConverter implements POConverter<Tuple, Tuple, POCollectedGroup> {
@Override
- public RDD<Tuple> convert(List<RDD<Tuple>> predecessors, POCollectedGroup physicalOperator) throws IOException {
- SparkUtil.assertPredecessorSize(predecessors, physicalOperator, 1);
- RDD<Tuple> rdd = predecessors.get(0);
- //return predecessors.get(0);
- RDD<Tuple> rdd2 = rdd.coalesce(1, false);
- long count = 0;
- try{
-
- count = rdd2.count();
- long ccount = 0;
-
- }catch(Exception e){
-
- }
- CollectedGroupFunction collectedGroupFunction = new CollectedGroupFunction(physicalOperator, count);
- return rdd.mapPartitions(collectedGroupFunction, true, SparkUtil.getManifest(Tuple.class));
- }
+ public RDD<Tuple> convert(List<RDD<Tuple>> predecessors,
+ POCollectedGroup physicalOperator) throws IOException {
+ SparkUtil.assertPredecessorSize(predecessors, physicalOperator, 1);
+ RDD<Tuple> rdd = predecessors.get(0);
+ // return predecessors.get(0);
+ RDD<Tuple> rdd2 = rdd.coalesce(1, false, null);
+ long count = 0;
+ try {
+
+ count = rdd2.count();
+
+ } catch (Exception e) {
+
+ }
+ CollectedGroupFunction collectedGroupFunction
+ = new CollectedGroupFunction(physicalOperator, count);
+ return rdd.toJavaRDD().mapPartitions(collectedGroupFunction, true).rdd();
+ }
- private static class CollectedGroupFunction extends Function<Iterator<Tuple>, Iterator<Tuple>> implements Serializable {
+ private static class CollectedGroupFunction implements FlatMapFunction<Iterator<Tuple>, Tuple> {
/**
- *
+ *
*/
private POCollectedGroup poCollectedGroup;
@@ -56,38 +51,42 @@ public class CollectedGroupConverter imp
this.poCollectedGroup = poCollectedGroup;
this.total_limit = count;
this.current_val = 0;
-
}
- public Iterator<Tuple> call(Iterator<Tuple> i) {
- final java.util.Iterator<Tuple> input = JavaConversions.asJavaIterator(i);
- Iterator<Tuple> output = JavaConversions.asScalaIterator(new POOutputConsumerIterator(input) {
- protected void attach(Tuple tuple) {
- poCollectedGroup.setInputs(null);
- poCollectedGroup.attachInput(tuple);
- poCollectedGroup.setParentPlan(poCollectedGroup.getPlans().get(0));
- try{
-
- current_val = current_val + 1;
- //System.out.println("Row: =>" + current_val);
- if(current_val == total_limit){
- proceed = true;
- }else{
- proceed = false;
- }
-
- }catch(Exception e){
- System.out.println("Crashhh in CollectedGroupConverter :" + e);
- e.printStackTrace();
- }
- }
-
- protected Result getNextResult() throws ExecException {
- return poCollectedGroup.getNextTuple(proceed);
- }
- });
- return output;
+ public Iterable<Tuple> call(final Iterator<Tuple> input) {
+
+ return new Iterable<Tuple>() {
+
+ @Override
+ public Iterator<Tuple> iterator() {
+ return new POOutputConsumerIterator(input) {
+ protected void attach(Tuple tuple) {
+ poCollectedGroup.setInputs(null);
+ poCollectedGroup.attachInput(tuple);
+ poCollectedGroup.setParentPlan(poCollectedGroup.getPlans().get(0));
+
+ try{
+
+ current_val = current_val + 1;
+ //System.out.println("Row: =>" + current_val);
+ if (current_val == total_limit) {
+ proceed = true;
+ } else {
+ proceed = false;
+ }
+
+ } catch(Exception e){
+ System.out.println("Crashhh in CollectedGroupConverter :" + e);
+ e.printStackTrace();
+ }
+ }
+
+ protected Result getNextResult() throws ExecException {
+ return poCollectedGroup.getNextTuple(proceed);
+ }
+ };
+ }
+ };
}
}
-
}
\ No newline at end of file
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/DistinctConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/DistinctConverter.java?rev=1630768&r1=1630767&r2=1630768&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/DistinctConverter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/DistinctConverter.java Fri Oct 10 11:48:53 2014
@@ -9,16 +9,15 @@ import org.apache.commons.logging.LogFac
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct;
import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
import org.apache.pig.data.Tuple;
+import org.apache.spark.rdd.PairRDDFunctions;
+import org.apache.spark.rdd.RDD;
import scala.Function1;
import scala.Function2;
import scala.Tuple2;
-//import scala.reflect.ClassManifest;
import scala.reflect.ClassTag;
import scala.runtime.AbstractFunction1;
import scala.runtime.AbstractFunction2;
-import org.apache.spark.rdd.PairRDDFunctions;
-import org.apache.spark.rdd.RDD;
@SuppressWarnings({ "serial" })
public class DistinctConverter implements POConverter<Tuple, Tuple, PODistinct> {
@@ -39,9 +38,10 @@ public class DistinctConverter implement
RDD<Tuple2<Tuple, Object>> rddPairs = rdd.map(TO_KEY_VALUE_FUNCTION,
tuple2ClassManifest);
- PairRDDFunctions<Tuple, Object> pairRDDFunctions = new PairRDDFunctions<Tuple, Object>(
+ PairRDDFunctions<Tuple, Object> pairRDDFunctions
+ = new PairRDDFunctions<Tuple, Object>(
rddPairs, SparkUtil.getManifest(Tuple.class),
- SparkUtil.getManifest(Object.class));
+ SparkUtil.getManifest(Object.class), null);
int parallelism = SparkUtil.getParallelism(predecessors, poDistinct);
return pairRDDFunctions.reduceByKey(MERGE_VALUES_FUNCTION, parallelism)
.map(TO_VALUE_FUNCTION, SparkUtil.getManifest(Tuple.class));
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ForEachConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ForEachConverter.java?rev=1630768&r1=1630767&r2=1630768&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ForEachConverter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ForEachConverter.java Fri Oct 10 11:48:53 2014
@@ -1,6 +1,7 @@
package org.apache.pig.backend.hadoop.executionengine.spark.converter;
import java.io.Serializable;
+import java.util.Iterator;
import java.util.List;
import org.apache.pig.backend.executionengine.ExecException;
@@ -8,10 +9,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
import org.apache.pig.data.Tuple;
-
-import scala.collection.Iterator;
-import scala.collection.JavaConversions;
-import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.rdd.RDD;
/**
@@ -26,12 +24,11 @@ public class ForEachConverter implements
SparkUtil.assertPredecessorSize(predecessors, physicalOperator, 1);
RDD<Tuple> rdd = predecessors.get(0);
ForEachFunction forEachFunction = new ForEachFunction(physicalOperator);
- return rdd.mapPartitions(forEachFunction, true,
- SparkUtil.getManifest(Tuple.class));
+ return rdd.toJavaRDD().mapPartitions(forEachFunction, true).rdd();
}
- private static class ForEachFunction extends
- Function<Iterator<Tuple>, Iterator<Tuple>> implements Serializable {
+ private static class ForEachFunction implements
+ FlatMapFunction<Iterator<Tuple>, Tuple>, Serializable {
private POForEach poForEach;
@@ -39,11 +36,14 @@ public class ForEachConverter implements
this.poForEach = poForEach;
}
- public Iterator<Tuple> call(Iterator<Tuple> i) {
- final java.util.Iterator<Tuple> input = JavaConversions
- .asJavaIterator(i);
- Iterator<Tuple> output = JavaConversions
- .asScalaIterator(new POOutputConsumerIterator(input) {
+ public Iterable<Tuple> call(final Iterator<Tuple> input) {
+
+ return new Iterable<Tuple>() {
+
+ @Override
+ public Iterator<Tuple> iterator() {
+ return new POOutputConsumerIterator(input) {
+
protected void attach(Tuple tuple) {
poForEach.setInputs(null);
poForEach.attachInput(tuple);
@@ -52,8 +52,9 @@ public class ForEachConverter implements
protected Result getNextResult() throws ExecException {
return poForEach.getNextTuple();
}
- });
- return output;
+ };
+ }
+ };
}
}
}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java?rev=1630768&r1=1630767&r2=1630768&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java Fri Oct 10 11:48:53 2014
@@ -14,6 +14,8 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.spark.HashPartitioner;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.rdd.CoGroupedRDD;
import org.apache.spark.rdd.RDD;
@@ -23,7 +25,6 @@ import scala.Tuple2;
import scala.collection.JavaConversions;
import scala.collection.Seq;
//import scala.reflect.ClassManifest;
-import scala.reflect.ClassTag;
@SuppressWarnings({ "serial" })
public class GlobalRearrangeConverter implements
@@ -40,7 +41,7 @@ public class GlobalRearrangeConverter im
private static final GroupTupleFunction GROUP_TUPLE_FUNCTION = new GroupTupleFunction();
private static final ToGroupKeyValueFunction TO_GROUP_KEY_VALUE_FUNCTION = new ToGroupKeyValueFunction();
- @Override
+ @Override
public RDD<Tuple> convert(List<RDD<Tuple>> predecessors,
POGlobalRearrange physicalOperator) throws IOException {
SparkUtil.assertPredecessorSizeGreaterThan(predecessors,
@@ -56,25 +57,16 @@ public class GlobalRearrangeConverter im
if (predecessors.size() == 1) {
// GROUP
- return predecessors
- .get(0)
- // group by key
- .groupBy(GET_KEY_FUNCTION, parallelism,
- SparkUtil.getManifest(Object.class))
- // convert result to a tuple (key, { values })
- .map(GROUP_TUPLE_FUNCTION,
- SparkUtil.getManifest(Tuple.class));
+ JavaRDD<Tuple> jrdd = predecessors.get(0).toJavaRDD();
+ JavaPairRDD<Object, Iterable<Tuple>> prdd = jrdd.groupBy(GET_KEY_FUNCTION, parallelism);
+ JavaRDD<Tuple> jrdd2 = prdd.map(GROUP_TUPLE_FUNCTION);
+ return jrdd2.rdd();
} else {
- // COGROUP
- // each pred returns (index, key, value)
- ClassTag<Tuple2<Object, Tuple>> tuple2ClassManifest = SparkUtil
- .<Object, Tuple> getTuple2Manifest();
-
- List<RDD<Tuple2<Object, Tuple>>> rddPairs = new ArrayList();
+ List<RDD<Tuple2<Object, Tuple>>> rddPairs = new ArrayList<RDD<Tuple2<Object, Tuple>>>();
for (RDD<Tuple> rdd : predecessors) {
- RDD<Tuple2<Object, Tuple>> rddPair = rdd.map(
- TO_KEY_VALUE_FUNCTION, tuple2ClassManifest);
- rddPairs.add(rddPair);
+ JavaRDD<Tuple> jrdd = JavaRDD.fromRDD(rdd, SparkUtil.getManifest(Tuple.class));
+ JavaRDD<Tuple2<Object, Tuple>> rddPair = jrdd.map(TO_KEY_VALUE_FUNCTION);
+ rddPairs.add(rddPair.rdd());
}
// Something's wrong with the type parameters of CoGroupedRDD
@@ -84,16 +76,14 @@ public class GlobalRearrangeConverter im
.asScalaBuffer(rddPairs).toSeq()),
new HashPartitioner(parallelism));
- RDD<Tuple2<Object, Seq<Seq<Tuple>>>> rdd = (RDD<Tuple2<Object, Seq<Seq<Tuple>>>>) (Object) coGroupedRDD;
- return rdd.map(TO_GROUP_KEY_VALUE_FUNCTION,
- SparkUtil.getManifest(Tuple.class));
+ RDD<Tuple2<Object, Seq<Seq<Tuple>>>> rdd =
+ (RDD<Tuple2<Object, Seq<Seq<Tuple>>>>) (Object) coGroupedRDD;
+ return rdd.toJavaRDD().map(TO_GROUP_KEY_VALUE_FUNCTION).rdd();
}
}
- private static class GetKeyFunction extends Function<Tuple, Object>
- implements Serializable {
+ private static class GetKeyFunction implements Function<Tuple, Object>, Serializable {
- @Override
public Object call(Tuple t) {
try {
LOG.debug("GetKeyFunction in " + t);
@@ -107,17 +97,15 @@ public class GlobalRearrangeConverter im
}
}
- private static class GroupTupleFunction extends
- Function<Tuple2<Object, Seq<Tuple>>, Tuple> implements Serializable {
+ private static class GroupTupleFunction implements Function<Tuple2<Object, Iterable<Tuple>>, Tuple>,
+ Serializable {
- @Override
- public Tuple call(Tuple2<Object, Seq<Tuple>> v1) {
+ public Tuple call(Tuple2<Object, Iterable<Tuple>> v1) {
try {
LOG.debug("GroupTupleFunction in " + v1);
Tuple tuple = tf.newTuple(2);
tuple.set(0, v1._1()); // the (index, key) tuple
- tuple.set(1, JavaConversions.asJavaCollection(v1._2())
- .iterator()); // the Seq<Tuple> aka bag of values
+ tuple.set(1, v1._2().iterator()); // the Seq<Tuple> aka bag of values
LOG.debug("GroupTupleFunction out " + tuple);
return tuple;
} catch (ExecException e) {
@@ -126,8 +114,8 @@ public class GlobalRearrangeConverter im
}
}
- private static class ToKeyValueFunction extends
- Function<Tuple, Tuple2<Object, Tuple>> implements Serializable {
+ private static class ToKeyValueFunction implements
+ Function<Tuple, Tuple2<Object, Tuple>>, Serializable {
@Override
public Tuple2<Object, Tuple> call(Tuple t) {
@@ -147,21 +135,21 @@ public class GlobalRearrangeConverter im
}
}
- private static class ToGroupKeyValueFunction extends
- Function<Tuple2<Object, Seq<Seq<Tuple>>>, Tuple> implements
- Serializable {
+ private static class ToGroupKeyValueFunction implements
+ Function<Tuple2<Object, Seq<Seq<Tuple>>>, Tuple>, Serializable {
@Override
public Tuple call(Tuple2<Object, Seq<Seq<Tuple>>> input) {
try {
LOG.debug("ToGroupKeyValueFunction2 in " + input);
final Object key = input._1();
- Seq<Seq<Tuple>> bags = input._2();
- Iterable<Seq<Tuple>> bagsList = JavaConversions
- .asJavaIterable(bags);
+ Object obj = input._2();
+ // XXX this is a hack for Spark 1.1.0: the type is an Array, not Seq
+ Seq<Tuple>[] bags = (Seq<Tuple>[])obj;
int i = 0;
- List<Iterator<Tuple>> tupleIterators = new ArrayList();
- for (Seq<Tuple> bag : bagsList) {
+ List<Iterator<Tuple>> tupleIterators = new ArrayList<Iterator<Tuple>>();
+ for (int j=0; j<bags.length; j++) {
+ Seq<Tuple> bag = bags[j];
Iterator<Tuple> iterator = JavaConversions
.asJavaCollection(bag).iterator();
final int index = i;
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LimitConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LimitConverter.java?rev=1630768&r1=1630767&r2=1630768&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LimitConverter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LimitConverter.java Fri Oct 10 11:48:53 2014
@@ -1,7 +1,7 @@
package org.apache.pig.backend.hadoop.executionengine.spark.converter;
import java.io.IOException;
-import java.io.Serializable;
+import java.util.Iterator;
import java.util.List;
import org.apache.pig.backend.executionengine.ExecException;
@@ -9,13 +9,8 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit;
import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
import org.apache.pig.data.Tuple;
-
-import scala.collection.Iterator;
-import scala.collection.JavaConversions;
-
-import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.rdd.RDD;
-import org.apache.spark.api.java.JavaSparkContext;
@SuppressWarnings({ "serial" })
public class LimitConverter implements POConverter<Tuple, Tuple, POLimit> {
@@ -26,13 +21,11 @@ public class LimitConverter implements P
SparkUtil.assertPredecessorSize(predecessors, poLimit, 1);
RDD<Tuple> rdd = predecessors.get(0);
LimitFunction limitFunction = new LimitFunction(poLimit);
- RDD<Tuple> rdd2 = rdd.coalesce(1, false);
- return rdd2.mapPartitions(limitFunction, false,
- SparkUtil.getManifest(Tuple.class));
+ RDD<Tuple> rdd2 = rdd.coalesce(1, false, null);
+ return rdd2.toJavaRDD().mapPartitions(limitFunction, false).rdd();
}
- private static class LimitFunction extends
- Function<Iterator<Tuple>, Iterator<Tuple>> implements Serializable {
+ private static class LimitFunction implements FlatMapFunction<Iterator<Tuple>, Tuple> {
private final POLimit poLimit;
@@ -41,12 +34,12 @@ public class LimitConverter implements P
}
@Override
- public Iterator<Tuple> call(Iterator<Tuple> i) {
- final java.util.Iterator<Tuple> tuples = JavaConversions
- .asJavaIterator(i);
+ public Iterable<Tuple> call(final Iterator<Tuple> tuples) {
+
+ return new Iterable<Tuple>() {
- return JavaConversions
- .asScalaIterator(new POOutputConsumerIterator(tuples) {
+ public Iterator<Tuple> iterator() {
+ return new POOutputConsumerIterator(tuples) {
protected void attach(Tuple tuple) {
poLimit.setInputs(null);
@@ -56,9 +49,9 @@ public class LimitConverter implements P
protected Result getNextResult() throws ExecException {
return poLimit.getNextTuple();
}
- });
+ };
+ }
+ };
}
-
}
-
-}
+}
\ No newline at end of file
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SkewedJoinConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SkewedJoinConverter.java?rev=1630768&r1=1630767&r2=1630768&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SkewedJoinConverter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SkewedJoinConverter.java Fri Oct 10 11:48:53 2014
@@ -140,9 +140,8 @@ public class SkewedJoinConverter impleme
}
private static class ToValueFunction
- extends
- FlatMapFunction<Iterator<Tuple2<Object, Tuple2<Tuple, Tuple>>>, Tuple>
- implements Serializable {
+ implements
+ FlatMapFunction<Iterator<Tuple2<Object, Tuple2<Tuple, Tuple>>>, Tuple>, Serializable {
private class Tuple2TransformIterable implements Iterable<Tuple> {
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java?rev=1630768&r1=1630767&r2=1630768&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java Fri Oct 10 11:48:53 2014
@@ -45,9 +45,8 @@ public class SortConverter implements PO
return mapped.rdd();
}
- private static class ToValueFunction extends
- FlatMapFunction<Iterator<Tuple2<Tuple, Object>>, Tuple> implements
- Serializable {
+ private static class ToValueFunction implements
+ FlatMapFunction<Iterator<Tuple2<Tuple, Object>>, Tuple>, Serializable {
private class Tuple2TransformIterable implements Iterable<Tuple> {
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StoreConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StoreConverter.java?rev=1630768&r1=1630767&r2=1630768&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StoreConverter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StoreConverter.java Fri Oct 10 11:48:53 2014
@@ -1,10 +1,8 @@
package org.apache.pig.backend.hadoop.executionengine.spark.converter;
import java.io.IOException;
-import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
@@ -17,13 +15,12 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.util.ObjectSerializer;
-
-import scala.Tuple2;
-import scala.collection.JavaConversions;
-import scala.runtime.AbstractFunction1;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.Function;
import org.apache.spark.rdd.PairRDDFunctions;
import org.apache.spark.rdd.RDD;
-import org.apache.spark.api.java.function.Function;
+
+import scala.Tuple2;
import com.google.common.collect.Lists;
@@ -48,11 +45,11 @@ public class StoreConverter implements
SparkUtil.assertPredecessorSize(predecessors, physicalOperator, 1);
RDD<Tuple> rdd = predecessors.get(0);
// convert back to KV pairs
- RDD<Tuple2<Text, Tuple>> rddPairs = rdd.map(FROM_TUPLE_FUNCTION,
- SparkUtil.<Text, Tuple> getTuple2Manifest());
+ JavaRDD<Tuple2<Text, Tuple>> rddPairs = rdd.toJavaRDD().map(FROM_TUPLE_FUNCTION);
+
PairRDDFunctions<Text, Tuple> pairRDDFunctions = new PairRDDFunctions<Text, Tuple>(
- rddPairs, SparkUtil.getManifest(Text.class),
- SparkUtil.getManifest(Tuple.class));
+ rddPairs.rdd(), SparkUtil.getManifest(Text.class),
+ SparkUtil.getManifest(Tuple.class), null);
JobConf storeJobConf = SparkUtil.newJobConf(pigContext);
POStore poStore = configureStorer(storeJobConf, physicalOperator);
@@ -61,7 +58,7 @@ public class StoreConverter implements
.getFileName(), Text.class, Tuple.class, PigOutputFormat.class,
storeJobConf);
- return rddPairs;
+ return rddPairs.rdd();
}
private static POStore configureStorer(JobConf jobConf,
@@ -82,8 +79,8 @@ public class StoreConverter implements
return poStore;
}
- private static class FromTupleFunction extends
- Function<Tuple, Tuple2<Text, Tuple>> implements Serializable {
+ private static class FromTupleFunction implements
+ Function<Tuple, Tuple2<Text, Tuple>> {
private static Text EMPTY_TEXT = new Text();