You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Daniel Haviv <da...@veracity-group.com> on 2015/03/08 17:22:29 UTC

using sparkContext from within a map function (from spark streaming app)

Hi,
We are designing a solution which pulls file paths from Kafka and for the
current stage just counts the lines in each of these files.
When running the code it fails on:
Exception in thread "main" org.apache.spark.SparkException: Task not
serializable
        at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
        at
org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
        at org.apache.spark.SparkContext.clean(SparkContext.scala:1478)
        at
org.apache.spark.streaming.dstream.DStream.map(DStream.scala:444)
        at
org.apache.spark.streaming.api.java.JavaDStreamLike$class.mapToPair(JavaDStreamLike.scala:146)
        at
org.apache.spark.streaming.api.java.JavaPairDStream.mapToPair(JavaPairDStream.scala:46)
        at streamReader.App.main(App.java:66)

Is using the sparkContext from inside a map function wrong ?

This is the code we are using:
        SparkConf conf = new SparkConf().setAppName("Simple
Application").setMaster("spark://namenode:7077");

            // KAFKA
            final JavaStreamingContext jssc = new
JavaStreamingContext(conf, new Duration(2000));
            Map<String, Integer> topicMap = new HashMap<String, Integer>();
            topicMap.put("uploadedFiles", 1);
            JavaPairReceiverInputDStream<String, String> messages =
            KafkaUtils.createStream(jssc, "localhost:2181", "group3",
topicMap);


            JavaDStream<String> files = messages.map(new
Function<Tuple2<String, String>, String>() {

                public String call(Tuple2<String, String> tuple2) {
                  return tuple2._2();
                }
              });


            JavaPairDStream<String, Integer> pairs = messages.mapToPair(
            new PairFunction<Tuple2<String, String>, String, Integer>()
            {
                         public Tuple2<String, Integer> call(Tuple2<String,
String> word) throws Exception
                         {
                        JavaRDD<String> textfile =
jssc.sparkContext().textFile(word._2());
                        int test = new Long(textfile.count()).intValue();
                                return new Tuple2<String,
Integer>(word._2(), test);
                         }
            });


            System.out.println("Printing Messages:");
            pairs.print();

            jssc.start();
            jssc.awaitTermination();
       jssc.close();

Thanks,
Daniel

Re: using sparkContext from within a map function (from spark streaming app)

Posted by Sean Owen <so...@cloudera.com>.
Yes, you can never use the SparkContext inside a remote function. It
is on the driver only.

On Sun, Mar 8, 2015 at 4:22 PM, Daniel Haviv
<da...@veracity-group.com> wrote:
> Hi,
> We are designing a solution which pulls file paths from Kafka and for the
> current stage just counts the lines in each of these files.
> When running the code it fails on:
> Exception in thread "main" org.apache.spark.SparkException: Task not
> serializable
>         at
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
>         at
> org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
>         at org.apache.spark.SparkContext.clean(SparkContext.scala:1478)
>         at org.apache.spark.streaming.dstream.DStream.map(DStream.scala:444)
>         at
> org.apache.spark.streaming.api.java.JavaDStreamLike$class.mapToPair(JavaDStreamLike.scala:146)
>         at
> org.apache.spark.streaming.api.java.JavaPairDStream.mapToPair(JavaPairDStream.scala:46)
>         at streamReader.App.main(App.java:66)
>
> Is using the sparkContext from inside a map function wrong ?
>
> This is the code we are using:
>         SparkConf conf = new SparkConf().setAppName("Simple
> Application").setMaster("spark://namenode:7077");
>
>             // KAFKA
>             final JavaStreamingContext jssc = new JavaStreamingContext(conf,
> new Duration(2000));
>             Map<String, Integer> topicMap = new HashMap<String, Integer>();
>             topicMap.put("uploadedFiles", 1);
>             JavaPairReceiverInputDStream<String, String> messages =
>             KafkaUtils.createStream(jssc, "localhost:2181", "group3",
> topicMap);
>
>
>             JavaDStream<String> files = messages.map(new
> Function<Tuple2<String, String>, String>() {
>
>                 public String call(Tuple2<String, String> tuple2) {
>                   return tuple2._2();
>                 }
>               });
>
>
>             JavaPairDStream<String, Integer> pairs = messages.mapToPair(
>             new PairFunction<Tuple2<String, String>, String, Integer>()
>             {
>                          public Tuple2<String, Integer> call(Tuple2<String,
> String> word) throws Exception
>                          {
>                         JavaRDD<String> textfile =
> jssc.sparkContext().textFile(word._2());
>                         int test = new Long(textfile.count()).intValue();
>                                 return new Tuple2<String,
> Integer>(word._2(), test);
>                          }
>             });
>
>
>             System.out.println("Printing Messages:");
>             pairs.print();
>
>             jssc.start();
>             jssc.awaitTermination();
>        jssc.close();
>
> Thanks,
> Daniel
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org