You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Subshiri S <su...@gmail.com> on 2015/07/30 08:00:06 UTC

Is it Spark Serialization bug ?

Hi all, I have tried to use lambda expression in spark task, and it throws
"java.lang.IllegalArgumentException: Invalid lambda deserialization"
exception. This exception is thrown when the is code like
"transform(pRDD->pRDD.map(t->t._2))" . The code snippet is below.

JavaPairDStream<String,Integer> aggregate =
pairRDD.reduceByKey((x,y)->x+y);JavaDStream<Integer> con =
aggregate.transform((Function<JavaPairRDD<String,Integer>,
JavaRDD<Integer>>)pRDD-> pRDD.map(
(Function<Tuple2<String,Integer>,Integer>)t->t._2));

JavaPairDStream<String,Integer> aggregate =
pairRDD.reduceByKey((x,y)->x+y);JavaDStream<Integer> con =
aggregate.transform((Function<JavaPairRDD<String,Integer>,
JavaRDD<Integer>> & Serializable)pRDD-> pRDD.map(
(Function<Tuple2<String,Integer>,Integer> & Serializable)t->t._2));

The above two options didn't worked. Where as if I pass below object "f" as
the argument instead of lambda expression"t->t_.2". It works.

Function f = new
Function<Tuple2<String,Integer>,Integer>(){@Overridepublic Integer
call(Tuple2<String,Integer> paramT1) throws Exception {return
paramT1._2;}};

May I know what is the right format to express that functions as a lambda
expression.

public static void main(String[] args) {

            Function f = new Function<Tuple2<String,Integer>,Integer>(){

                @Override
                public Integer call(Tuple2<String,Integer> paramT1)
throws Exception {
                    return paramT1._2;
                }

            };

            JavaStreamingContext ssc = JavaStreamingFactory.getInstance();

            JavaReceiverInputDStream<String> lines =
ssc.socketTextStream("localhost", 9999);
            JavaDStream<String> words =  lines.flatMap(s->{return
Arrays.asList(s.split(" "));});
            JavaPairDStream<String,Integer> pairRDD =
words.mapToPair(x->new Tuple2<String,Integer>(x,1));
            JavaPairDStream<String,Integer> aggregate =
pairRDD.reduceByKey((x,y)->x+y);
            JavaDStream<Integer> con = aggregate.transform(
                    (Function<JavaPairRDD<String,Integer>,
JavaRDD<Integer>>)pRDD-> pRDD.map(
                            (Function<Tuple2<String,Integer>,Integer>)t->t._2));
          //JavaDStream<Integer> con = aggregate.transform(pRDD->
pRDD.map(f)); It works
            con.print();

            ssc.start();
            ssc.awaitTermination();


        }