You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pr...@apache.org on 2014/10/10 13:48:54 UTC

svn commit: r1630768 - in /pig/branches/spark: ./ ivy/ src/org/apache/pig/backend/hadoop/executionengine/spark/converter/

Author: praveen
Date: Fri Oct 10 11:48:53 2014
New Revision: 1630768

URL: http://svn.apache.org/r1630768
Log:
PIG-4173: Upgrading spark version to 1.1.0 (richard ding via praveen)

Modified:
    pig/branches/spark/ivy.xml
    pig/branches/spark/ivy/libraries.properties
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CollectedGroupConverter.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/DistinctConverter.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ForEachConverter.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LimitConverter.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SkewedJoinConverter.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StoreConverter.java

Modified: pig/branches/spark/ivy.xml
URL: http://svn.apache.org/viewvc/pig/branches/spark/ivy.xml?rev=1630768&r1=1630767&r2=1630768&view=diff
==============================================================================
--- pig/branches/spark/ivy.xml (original)
+++ pig/branches/spark/ivy.xml Fri Oct 10 11:48:53 2014
@@ -435,9 +435,17 @@
     <dependency org="com.twitter" name="parquet-pig-bundle" rev="${parquet-pig-bundle.version}" conf="compile->master"/>
 
     <!-- for Spark integration -->
-    <dependency org="org.apache.spark" name="spark-core_2.10" rev="0.9.0-incubating" conf="compile->default">
+    <dependency org="org.apache.spark" name="spark-core_2.10" rev="${spark.version}" conf="compile->default">
         <exclude org="org.eclipse.jetty.orbit" module="javax.servlet"/>
+        <exclude org="org.eclipse.jetty.orbit" module="javax.transaction"/>
+        <exclude org="org.eclipse.jetty.orbit" module="javax.mail.glassfish"/>
+        <exclude org="org.eclipse.jetty.orbit" module="javax.activation"/>
     </dependency>
+    <dependency org="asm" name="asm" rev="${asm.version}" conf="compile->master">
+    </dependency>
+    <dependency org="javax.servlet" name="javax.servlet-api" rev="3.0.1"
+             conf="compile->master"/>
+
 
     <!-- for Tez integration -->
     <dependency org="org.apache.tez" name="tez" rev="${tez.version}"

Modified: pig/branches/spark/ivy/libraries.properties
URL: http://svn.apache.org/viewvc/pig/branches/spark/ivy/libraries.properties?rev=1630768&r1=1630767&r2=1630768&view=diff
==============================================================================
--- pig/branches/spark/ivy/libraries.properties (original)
+++ pig/branches/spark/ivy/libraries.properties Fri Oct 10 11:48:53 2014
@@ -17,6 +17,7 @@
 accumulo15.version=1.5.0
 apacheant.version=1.7.1
 apacherat.version=0.8
+asm.version=3.2
 automaton.version=1.11-8
 avro.version=1.7.5
 commons-beanutils.version=1.7.0
@@ -76,6 +77,7 @@ netty.version=3.2.2
 rats-lib.version=0.5.1
 slf4j-api.version=1.6.1
 slf4j-log4j12.version=1.6.1
+spark.version=1.1.0
 xerces.version=2.10.0
 xalan.version=2.7.1
 wagon-http.version=1.0-beta-2

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CollectedGroupConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CollectedGroupConverter.java?rev=1630768&r1=1630767&r2=1630768&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CollectedGroupConverter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CollectedGroupConverter.java Fri Oct 10 11:48:53 2014
@@ -1,21 +1,15 @@
 package org.apache.pig.backend.hadoop.executionengine.spark.converter;
 
 import java.io.IOException;
-import java.io.Serializable;
+import java.util.Iterator;
 import java.util.List;
 
-import org.apache.pig.Main;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
-import org.apache.pig.backend.hadoop.executionengine.spark.BroadCastServer;
 import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.plan.PlanException;
-
-import scala.collection.Iterator;
-import scala.collection.JavaConversions;
-import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.FlatMapFunction;
 import org.apache.spark.rdd.RDD;
 
 
@@ -23,28 +17,29 @@ import org.apache.spark.rdd.RDD;
 public class CollectedGroupConverter implements POConverter<Tuple, Tuple, POCollectedGroup> {
 
 	@Override
-	public RDD<Tuple> convert(List<RDD<Tuple>> predecessors, POCollectedGroup physicalOperator) throws IOException {
-		SparkUtil.assertPredecessorSize(predecessors, physicalOperator, 1);
-		RDD<Tuple> rdd = predecessors.get(0);
-		//return predecessors.get(0);
-		RDD<Tuple> rdd2 = rdd.coalesce(1, false);
-		long count = 0;
-		try{
-
-			count = rdd2.count();
-			long ccount = 0;
-
-		}catch(Exception e){
-
-		}
-		CollectedGroupFunction collectedGroupFunction = new CollectedGroupFunction(physicalOperator, count);
-		return rdd.mapPartitions(collectedGroupFunction, true, SparkUtil.getManifest(Tuple.class));
-	}
+  public RDD<Tuple> convert(List<RDD<Tuple>> predecessors,
+      POCollectedGroup physicalOperator) throws IOException {
+    SparkUtil.assertPredecessorSize(predecessors, physicalOperator, 1);
+    RDD<Tuple> rdd = predecessors.get(0);
+    // return predecessors.get(0);
+    RDD<Tuple> rdd2 = rdd.coalesce(1, false, null);
+    long count = 0;
+    try {
+
+      count = rdd2.count();
+
+    } catch (Exception e) {
+
+    }
+    CollectedGroupFunction collectedGroupFunction
+        = new CollectedGroupFunction(physicalOperator, count);
+    return rdd.toJavaRDD().mapPartitions(collectedGroupFunction, true).rdd();
+  }
 
-	private static class CollectedGroupFunction extends Function<Iterator<Tuple>, Iterator<Tuple>> implements Serializable {
+	private static class CollectedGroupFunction implements FlatMapFunction<Iterator<Tuple>, Tuple> {
 
 		/**
-		 * 
+		 *
 		 */
 		private POCollectedGroup poCollectedGroup;
 
@@ -56,38 +51,42 @@ public class CollectedGroupConverter imp
 			this.poCollectedGroup = poCollectedGroup;
 			this.total_limit = count;
 			this.current_val = 0;
-
 		}
 
-		public Iterator<Tuple> call(Iterator<Tuple> i) {
-			final java.util.Iterator<Tuple> input = JavaConversions.asJavaIterator(i);
-			Iterator<Tuple> output = JavaConversions.asScalaIterator(new POOutputConsumerIterator(input) {
-				protected void attach(Tuple tuple) {
-					poCollectedGroup.setInputs(null);
-					poCollectedGroup.attachInput(tuple);
-					poCollectedGroup.setParentPlan(poCollectedGroup.getPlans().get(0));
-					try{
-
-						current_val = current_val + 1;
-						//System.out.println("Row: =>" + current_val);
-						if(current_val == total_limit){
-							proceed = true;
-						}else{
-							proceed = false;
-						}
-
-					}catch(Exception e){
-						System.out.println("Crashhh in CollectedGroupConverter :" + e);
-						e.printStackTrace();
-					}					
-				}
-
-				protected Result getNextResult() throws ExecException {
-					return poCollectedGroup.getNextTuple(proceed);
-				}
-			});
-			return output;
+		public Iterable<Tuple> call(final Iterator<Tuple> input) {
+
+		  return new Iterable<Tuple>() {
+
+		    @Override
+		    public Iterator<Tuple> iterator() {
+		      return new POOutputConsumerIterator(input) {
+		        protected void attach(Tuple tuple) {
+		          poCollectedGroup.setInputs(null);
+		          poCollectedGroup.attachInput(tuple);
+		          poCollectedGroup.setParentPlan(poCollectedGroup.getPlans().get(0));
+
+		          try{
+
+		            current_val = current_val + 1;
+		            //System.out.println("Row: =>" + current_val);
+		            if (current_val == total_limit) {
+		              proceed = true;
+		            } else {
+		              proceed = false;
+		            }
+
+		          } catch(Exception e){
+		            System.out.println("Crashhh in CollectedGroupConverter :" + e);
+		            e.printStackTrace();
+		          }
+		        }
+
+		        protected Result getNextResult() throws ExecException {
+		          return poCollectedGroup.getNextTuple(proceed);
+		        }
+		      };
+		    }
+      };
 		}
 	}
-
 }
\ No newline at end of file

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/DistinctConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/DistinctConverter.java?rev=1630768&r1=1630767&r2=1630768&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/DistinctConverter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/DistinctConverter.java Fri Oct 10 11:48:53 2014
@@ -9,16 +9,15 @@ import org.apache.commons.logging.LogFac
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct;
 import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
 import org.apache.pig.data.Tuple;
+import org.apache.spark.rdd.PairRDDFunctions;
+import org.apache.spark.rdd.RDD;
 
 import scala.Function1;
 import scala.Function2;
 import scala.Tuple2;
-//import scala.reflect.ClassManifest;
 import scala.reflect.ClassTag;
 import scala.runtime.AbstractFunction1;
 import scala.runtime.AbstractFunction2;
-import org.apache.spark.rdd.PairRDDFunctions;
-import org.apache.spark.rdd.RDD;
 
 @SuppressWarnings({ "serial" })
 public class DistinctConverter implements POConverter<Tuple, Tuple, PODistinct> {
@@ -39,9 +38,10 @@ public class DistinctConverter implement
 
         RDD<Tuple2<Tuple, Object>> rddPairs = rdd.map(TO_KEY_VALUE_FUNCTION,
                 tuple2ClassManifest);
-        PairRDDFunctions<Tuple, Object> pairRDDFunctions = new PairRDDFunctions<Tuple, Object>(
+        PairRDDFunctions<Tuple, Object> pairRDDFunctions
+          = new PairRDDFunctions<Tuple, Object>(
                 rddPairs, SparkUtil.getManifest(Tuple.class),
-                SparkUtil.getManifest(Object.class));
+                SparkUtil.getManifest(Object.class), null);
         int parallelism = SparkUtil.getParallelism(predecessors, poDistinct);
         return pairRDDFunctions.reduceByKey(MERGE_VALUES_FUNCTION, parallelism)
                 .map(TO_VALUE_FUNCTION, SparkUtil.getManifest(Tuple.class));

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ForEachConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ForEachConverter.java?rev=1630768&r1=1630767&r2=1630768&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ForEachConverter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ForEachConverter.java Fri Oct 10 11:48:53 2014
@@ -1,6 +1,7 @@
 package org.apache.pig.backend.hadoop.executionengine.spark.converter;
 
 import java.io.Serializable;
+import java.util.Iterator;
 import java.util.List;
 
 import org.apache.pig.backend.executionengine.ExecException;
@@ -8,10 +9,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
 import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
 import org.apache.pig.data.Tuple;
-
-import scala.collection.Iterator;
-import scala.collection.JavaConversions;
-import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.FlatMapFunction;
 import org.apache.spark.rdd.RDD;
 
 /**
@@ -26,12 +24,11 @@ public class ForEachConverter implements
         SparkUtil.assertPredecessorSize(predecessors, physicalOperator, 1);
         RDD<Tuple> rdd = predecessors.get(0);
         ForEachFunction forEachFunction = new ForEachFunction(physicalOperator);
-        return rdd.mapPartitions(forEachFunction, true,
-                SparkUtil.getManifest(Tuple.class));
+        return rdd.toJavaRDD().mapPartitions(forEachFunction, true).rdd();
     }
 
-    private static class ForEachFunction extends
-            Function<Iterator<Tuple>, Iterator<Tuple>> implements Serializable {
+    private static class ForEachFunction implements
+            FlatMapFunction<Iterator<Tuple>, Tuple>, Serializable {
 
         private POForEach poForEach;
 
@@ -39,11 +36,14 @@ public class ForEachConverter implements
             this.poForEach = poForEach;
         }
 
-        public Iterator<Tuple> call(Iterator<Tuple> i) {
-            final java.util.Iterator<Tuple> input = JavaConversions
-                    .asJavaIterator(i);
-            Iterator<Tuple> output = JavaConversions
-                    .asScalaIterator(new POOutputConsumerIterator(input) {
+        public Iterable<Tuple> call(final Iterator<Tuple> input) {
+
+            return new Iterable<Tuple>() {
+
+                @Override
+                public Iterator<Tuple> iterator() {
+                    return new POOutputConsumerIterator(input) {
+
                         protected void attach(Tuple tuple) {
                             poForEach.setInputs(null);
                             poForEach.attachInput(tuple);
@@ -52,8 +52,9 @@ public class ForEachConverter implements
                         protected Result getNextResult() throws ExecException {
                             return poForEach.getNextTuple();
                         }
-                    });
-            return output;
+                    };
+                }
+            };
         }
     }
 }

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java?rev=1630768&r1=1630767&r2=1630768&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java Fri Oct 10 11:48:53 2014
@@ -14,6 +14,8 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 import org.apache.spark.HashPartitioner;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.function.Function;
 import org.apache.spark.rdd.CoGroupedRDD;
 import org.apache.spark.rdd.RDD;
@@ -23,7 +25,6 @@ import scala.Tuple2;
 import scala.collection.JavaConversions;
 import scala.collection.Seq;
 //import scala.reflect.ClassManifest;
-import scala.reflect.ClassTag;
 
 @SuppressWarnings({ "serial" })
 public class GlobalRearrangeConverter implements
@@ -40,7 +41,7 @@ public class GlobalRearrangeConverter im
     private static final GroupTupleFunction GROUP_TUPLE_FUNCTION = new GroupTupleFunction();
     private static final ToGroupKeyValueFunction TO_GROUP_KEY_VALUE_FUNCTION = new ToGroupKeyValueFunction();
 
-    @Override
+  @Override
     public RDD<Tuple> convert(List<RDD<Tuple>> predecessors,
             POGlobalRearrange physicalOperator) throws IOException {
         SparkUtil.assertPredecessorSizeGreaterThan(predecessors,
@@ -56,25 +57,16 @@ public class GlobalRearrangeConverter im
 
         if (predecessors.size() == 1) {
             // GROUP
-            return predecessors
-                    .get(0)
-                    // group by key
-                    .groupBy(GET_KEY_FUNCTION, parallelism,
-                            SparkUtil.getManifest(Object.class))
-                    // convert result to a tuple (key, { values })
-                    .map(GROUP_TUPLE_FUNCTION,
-                            SparkUtil.getManifest(Tuple.class));
+            JavaRDD<Tuple> jrdd = predecessors.get(0).toJavaRDD();
+            JavaPairRDD<Object, Iterable<Tuple>> prdd = jrdd.groupBy(GET_KEY_FUNCTION, parallelism);
+            JavaRDD<Tuple> jrdd2 = prdd.map(GROUP_TUPLE_FUNCTION);
+            return jrdd2.rdd();
         } else {
-            // COGROUP
-            // each pred returns (index, key, value)
-            ClassTag<Tuple2<Object, Tuple>> tuple2ClassManifest = SparkUtil
-                    .<Object, Tuple> getTuple2Manifest();
-
-            List<RDD<Tuple2<Object, Tuple>>> rddPairs = new ArrayList();
+            List<RDD<Tuple2<Object, Tuple>>> rddPairs = new ArrayList<RDD<Tuple2<Object, Tuple>>>();
             for (RDD<Tuple> rdd : predecessors) {
-                RDD<Tuple2<Object, Tuple>> rddPair = rdd.map(
-                        TO_KEY_VALUE_FUNCTION, tuple2ClassManifest);
-                rddPairs.add(rddPair);
+                JavaRDD<Tuple> jrdd = JavaRDD.fromRDD(rdd, SparkUtil.getManifest(Tuple.class));
+                JavaRDD<Tuple2<Object, Tuple>> rddPair = jrdd.map(TO_KEY_VALUE_FUNCTION);
+                rddPairs.add(rddPair.rdd());
             }
 
             // Something's wrong with the type parameters of CoGroupedRDD
@@ -84,16 +76,14 @@ public class GlobalRearrangeConverter im
                             .asScalaBuffer(rddPairs).toSeq()),
                     new HashPartitioner(parallelism));
 
-            RDD<Tuple2<Object, Seq<Seq<Tuple>>>> rdd = (RDD<Tuple2<Object, Seq<Seq<Tuple>>>>) (Object) coGroupedRDD;
-            return rdd.map(TO_GROUP_KEY_VALUE_FUNCTION,
-                    SparkUtil.getManifest(Tuple.class));
+            RDD<Tuple2<Object, Seq<Seq<Tuple>>>> rdd =
+                (RDD<Tuple2<Object, Seq<Seq<Tuple>>>>) (Object) coGroupedRDD;
+            return rdd.toJavaRDD().map(TO_GROUP_KEY_VALUE_FUNCTION).rdd();
         }
     }
 
-    private static class GetKeyFunction extends Function<Tuple, Object>
-            implements Serializable {
+    private static class GetKeyFunction implements Function<Tuple, Object>, Serializable {
 
-        @Override
         public Object call(Tuple t) {
             try {
                 LOG.debug("GetKeyFunction in " + t);
@@ -107,17 +97,15 @@ public class GlobalRearrangeConverter im
         }
     }
 
-    private static class GroupTupleFunction extends
-            Function<Tuple2<Object, Seq<Tuple>>, Tuple> implements Serializable {
+    private static class GroupTupleFunction implements Function<Tuple2<Object, Iterable<Tuple>>, Tuple>,
+        Serializable {
 
-        @Override
-        public Tuple call(Tuple2<Object, Seq<Tuple>> v1) {
+        public Tuple call(Tuple2<Object, Iterable<Tuple>> v1) {
             try {
                 LOG.debug("GroupTupleFunction in " + v1);
                 Tuple tuple = tf.newTuple(2);
                 tuple.set(0, v1._1()); // the (index, key) tuple
-                tuple.set(1, JavaConversions.asJavaCollection(v1._2())
-                        .iterator()); // the Seq<Tuple> aka bag of values
+                tuple.set(1, v1._2().iterator()); // the Seq<Tuple> aka bag of values
                 LOG.debug("GroupTupleFunction out " + tuple);
                 return tuple;
             } catch (ExecException e) {
@@ -126,8 +114,8 @@ public class GlobalRearrangeConverter im
         }
     }
 
-    private static class ToKeyValueFunction extends
-            Function<Tuple, Tuple2<Object, Tuple>> implements Serializable {
+    private static class ToKeyValueFunction implements
+            Function<Tuple, Tuple2<Object, Tuple>>, Serializable {
 
         @Override
         public Tuple2<Object, Tuple> call(Tuple t) {
@@ -147,21 +135,21 @@ public class GlobalRearrangeConverter im
         }
     }
 
-    private static class ToGroupKeyValueFunction extends
-            Function<Tuple2<Object, Seq<Seq<Tuple>>>, Tuple> implements
-            Serializable {
+    private static class ToGroupKeyValueFunction implements
+            Function<Tuple2<Object, Seq<Seq<Tuple>>>, Tuple>, Serializable {
 
         @Override
         public Tuple call(Tuple2<Object, Seq<Seq<Tuple>>> input) {
             try {
                 LOG.debug("ToGroupKeyValueFunction2 in " + input);
                 final Object key = input._1();
-                Seq<Seq<Tuple>> bags = input._2();
-                Iterable<Seq<Tuple>> bagsList = JavaConversions
-                        .asJavaIterable(bags);
+                Object obj = input._2();
+                // XXX this is a hack for Spark 1.1.0: the type is an Array, not Seq
+                Seq<Tuple>[] bags = (Seq<Tuple>[])obj;
                 int i = 0;
-                List<Iterator<Tuple>> tupleIterators = new ArrayList();
-                for (Seq<Tuple> bag : bagsList) {
+                List<Iterator<Tuple>> tupleIterators = new ArrayList<Iterator<Tuple>>();
+                for (int j=0; j<bags.length; j++) {
+                    Seq<Tuple> bag = bags[j];
                     Iterator<Tuple> iterator = JavaConversions
                             .asJavaCollection(bag).iterator();
                     final int index = i;

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LimitConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LimitConverter.java?rev=1630768&r1=1630767&r2=1630768&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LimitConverter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LimitConverter.java Fri Oct 10 11:48:53 2014
@@ -1,7 +1,7 @@
 package org.apache.pig.backend.hadoop.executionengine.spark.converter;
 
 import java.io.IOException;
-import java.io.Serializable;
+import java.util.Iterator;
 import java.util.List;
 
 import org.apache.pig.backend.executionengine.ExecException;
@@ -9,13 +9,8 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit;
 import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
 import org.apache.pig.data.Tuple;
-
-import scala.collection.Iterator;
-import scala.collection.JavaConversions;
-
-import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.FlatMapFunction;
 import org.apache.spark.rdd.RDD;
-import org.apache.spark.api.java.JavaSparkContext;
 
 @SuppressWarnings({ "serial" })
 public class LimitConverter implements POConverter<Tuple, Tuple, POLimit> {
@@ -26,13 +21,11 @@ public class LimitConverter implements P
         SparkUtil.assertPredecessorSize(predecessors, poLimit, 1);
         RDD<Tuple> rdd = predecessors.get(0);
         LimitFunction limitFunction = new LimitFunction(poLimit);
-        RDD<Tuple> rdd2 = rdd.coalesce(1, false);
-        return rdd2.mapPartitions(limitFunction, false,
-                SparkUtil.getManifest(Tuple.class));
+        RDD<Tuple> rdd2 = rdd.coalesce(1, false, null);
+        return rdd2.toJavaRDD().mapPartitions(limitFunction, false).rdd();
     }
 
-    private static class LimitFunction extends
-            Function<Iterator<Tuple>, Iterator<Tuple>> implements Serializable {
+    private static class LimitFunction implements FlatMapFunction<Iterator<Tuple>, Tuple> {
 
         private final POLimit poLimit;
 
@@ -41,12 +34,12 @@ public class LimitConverter implements P
         }
 
         @Override
-        public Iterator<Tuple> call(Iterator<Tuple> i) {
-            final java.util.Iterator<Tuple> tuples = JavaConversions
-                    .asJavaIterator(i);
+        public Iterable<Tuple> call(final Iterator<Tuple> tuples) {
+
+            return new Iterable<Tuple>() {
 
-            return JavaConversions
-                    .asScalaIterator(new POOutputConsumerIterator(tuples) {
+                public Iterator<Tuple> iterator() {
+                    return new POOutputConsumerIterator(tuples) {
 
                         protected void attach(Tuple tuple) {
                             poLimit.setInputs(null);
@@ -56,9 +49,9 @@ public class LimitConverter implements P
                         protected Result getNextResult() throws ExecException {
                             return poLimit.getNextTuple();
                         }
-                    });
+                    };
+                }
+            };
         }
-
     }
-
-}
+}
\ No newline at end of file

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SkewedJoinConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SkewedJoinConverter.java?rev=1630768&r1=1630767&r2=1630768&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SkewedJoinConverter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SkewedJoinConverter.java Fri Oct 10 11:48:53 2014
@@ -140,9 +140,8 @@ public class SkewedJoinConverter impleme
     }
 
     private static class ToValueFunction
-            extends
-            FlatMapFunction<Iterator<Tuple2<Object, Tuple2<Tuple, Tuple>>>, Tuple>
-            implements Serializable {
+            implements
+            FlatMapFunction<Iterator<Tuple2<Object, Tuple2<Tuple, Tuple>>>, Tuple>, Serializable {
 
         private class Tuple2TransformIterable implements Iterable<Tuple> {
 

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java?rev=1630768&r1=1630767&r2=1630768&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java Fri Oct 10 11:48:53 2014
@@ -45,9 +45,8 @@ public class SortConverter implements PO
         return mapped.rdd();
     }
 
-    private static class ToValueFunction extends
-            FlatMapFunction<Iterator<Tuple2<Tuple, Object>>, Tuple> implements
-            Serializable {
+    private static class ToValueFunction implements
+            FlatMapFunction<Iterator<Tuple2<Tuple, Object>>, Tuple>, Serializable {
 
         private class Tuple2TransformIterable implements Iterable<Tuple> {
 

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StoreConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StoreConverter.java?rev=1630768&r1=1630767&r2=1630768&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StoreConverter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StoreConverter.java Fri Oct 10 11:48:53 2014
@@ -1,10 +1,8 @@
 package org.apache.pig.backend.hadoop.executionengine.spark.converter;
 
 import java.io.IOException;
-import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
 
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobConf;
@@ -17,13 +15,12 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.util.ObjectSerializer;
-
-import scala.Tuple2;
-import scala.collection.JavaConversions;
-import scala.runtime.AbstractFunction1;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.Function;
 import org.apache.spark.rdd.PairRDDFunctions;
 import org.apache.spark.rdd.RDD;
-import org.apache.spark.api.java.function.Function;
+
+import scala.Tuple2;
 
 import com.google.common.collect.Lists;
 
@@ -48,11 +45,11 @@ public class StoreConverter implements
         SparkUtil.assertPredecessorSize(predecessors, physicalOperator, 1);
         RDD<Tuple> rdd = predecessors.get(0);
         // convert back to KV pairs
-        RDD<Tuple2<Text, Tuple>> rddPairs = rdd.map(FROM_TUPLE_FUNCTION,
-                SparkUtil.<Text, Tuple> getTuple2Manifest());
+        JavaRDD<Tuple2<Text, Tuple>> rddPairs = rdd.toJavaRDD().map(FROM_TUPLE_FUNCTION);
+
         PairRDDFunctions<Text, Tuple> pairRDDFunctions = new PairRDDFunctions<Text, Tuple>(
-                rddPairs, SparkUtil.getManifest(Text.class),
-                SparkUtil.getManifest(Tuple.class));
+                rddPairs.rdd(), SparkUtil.getManifest(Text.class),
+                SparkUtil.getManifest(Tuple.class), null);
 
         JobConf storeJobConf = SparkUtil.newJobConf(pigContext);
         POStore poStore = configureStorer(storeJobConf, physicalOperator);
@@ -61,7 +58,7 @@ public class StoreConverter implements
                 .getFileName(), Text.class, Tuple.class, PigOutputFormat.class,
                 storeJobConf);
 
-        return rddPairs;
+        return rddPairs.rdd();
     }
 
     private static POStore configureStorer(JobConf jobConf,
@@ -82,8 +79,8 @@ public class StoreConverter implements
         return poStore;
     }
 
-    private static class FromTupleFunction extends
-            Function<Tuple, Tuple2<Text, Tuple>> implements Serializable {
+    private static class FromTupleFunction implements
+            Function<Tuple, Tuple2<Text, Tuple>> {
 
         private static Text EMPTY_TEXT = new Text();