You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Rachana Srivastava <Ra...@markmonitor.com> on 2015/09/11 21:09:56 UTC

New JavaRDD Inside JavaPairDStream

Hello all,

Can we invoke JavaRDD while processing stream from Kafka for example.  Following code is throwing some serialization exception.  Not sure if this is feasible.

  JavaStreamingContext jssc = new JavaStreamingContext(jsc, Durations.seconds(5));
    JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(jssc, zkQuorum, group, topicMap);
    JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
      public String call(Tuple2<String, String> tuple2) { return tuple2._2();
      }
    });
    JavaPairDStream<String, String> wordCounts = lines.mapToPair( new PairFunction<String, String, String>() {
        public Tuple2<String, String> call(String urlString) {
                        String propertiesFile = "/home/cloudera/Desktop/sample/input/featurelist.properties";
                        JavaRDD<String> propertiesFileRDD = jsc.textFile(propertiesFile);
                          JavaPairRDD<String, String> featureKeyClassPair = propertiesFileRDD.mapToPair(
                                      new PairFunction<String, String, String>() {
                                                  public Tuple2<String, String> call(String property) {
                                                    return new Tuple2(property.split("=")[0], property.split("=")[1]);
                                                  }
                                     });
                                    featureKeyClassPair.count();
          return new Tuple2<String, String>(urlString,  featureScore);
        }
      });


Re: New JavaRDD Inside JavaPairDStream

Posted by Cody Koeninger <co...@koeninger.org>.
No, in general you can't make new RDDs in code running on the executors.

It looks like your properties file is a constant, why not process it at the
beginning of the job and broadcast the result?

On Fri, Sep 11, 2015 at 2:09 PM, Rachana Srivastava <
Rachana.Srivastava@markmonitor.com> wrote:

> Hello all,
>
>
>
> Can we invoke JavaRDD while processing stream from Kafka for example.
> Following code is throwing some serialization exception.  Not sure if this
> is feasible.
>
>
>
>   JavaStreamingContext jssc = *new* JavaStreamingContext(jsc, Durations.
> *seconds*(5));
>
>     JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.
> *createStream*(jssc, zkQuorum, group, topicMap);
>
>     JavaDStream<String> lines = messages.map(*new* *Function<Tuple2<String,
> String>, String>()* {
>
>       *public* String call(Tuple2<String, String> tuple2) { *return*
> tuple2._2();
>
>       }
>
>     });
>
>     JavaPairDStream<String, String> wordCounts = lines.mapToPair( *new* *PairFunction<String,
> String, String>()* {
>
>         *public* Tuple2<String, String> call(String urlString) {
>
>                         String propertiesFile =
> "/home/cloudera/Desktop/sample/input/featurelist.properties";
>
>                         JavaRDD<String> propertiesFileRDD = jsc.textFile(
> propertiesFile);
>
>                           JavaPairRDD<String, String> featureKeyClassPair
> = propertiesFileRDD.mapToPair(
>
>                                       *new* *PairFunction<String, String,
> String>()* {
>
>                                                   *public* Tuple2<String,
> String> call(String property) {
>
>                                                     *return* *new**
> Tuple2(**property**.split(**"="**)[0], **property**.split(**"="**)[1])*;
>
>                                                   }
>
>                                      });
>
>                                     featureKeyClassPair.count();
>
>           *return* *new* Tuple2<String, String>(urlString,  featureScore);
>
>         }
>
>       });
>
>
>

Re: New JavaRDD Inside JavaPairDStream

Posted by Cody Koeninger <co...@koeninger.org>.
No, in general you can't make new RDDs in code running on the executors.

It looks like your properties file is a constant, why not process it at the
beginning of the job and broadcast the result?

On Fri, Sep 11, 2015 at 2:09 PM, Rachana Srivastava <
Rachana.Srivastava@markmonitor.com> wrote:

> Hello all,
>
>
>
> Can we invoke JavaRDD while processing stream from Kafka for example.
> Following code is throwing some serialization exception.  Not sure if this
> is feasible.
>
>
>
>   JavaStreamingContext jssc = *new* JavaStreamingContext(jsc, Durations.
> *seconds*(5));
>
>     JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.
> *createStream*(jssc, zkQuorum, group, topicMap);
>
>     JavaDStream<String> lines = messages.map(*new* *Function<Tuple2<String,
> String>, String>()* {
>
>       *public* String call(Tuple2<String, String> tuple2) { *return*
> tuple2._2();
>
>       }
>
>     });
>
>     JavaPairDStream<String, String> wordCounts = lines.mapToPair( *new* *PairFunction<String,
> String, String>()* {
>
>         *public* Tuple2<String, String> call(String urlString) {
>
>                         String propertiesFile =
> "/home/cloudera/Desktop/sample/input/featurelist.properties";
>
>                         JavaRDD<String> propertiesFileRDD = jsc.textFile(
> propertiesFile);
>
>                           JavaPairRDD<String, String> featureKeyClassPair
> = propertiesFileRDD.mapToPair(
>
>                                       *new* *PairFunction<String, String,
> String>()* {
>
>                                                   *public* Tuple2<String,
> String> call(String property) {
>
>                                                     *return* *new**
> Tuple2(**property**.split(**"="**)[0], **property**.split(**"="**)[1])*;
>
>                                                   }
>
>                                      });
>
>                                     featureKeyClassPair.count();
>
>           *return* *new* Tuple2<String, String>(urlString,  featureScore);
>
>         }
>
>       });
>
>
>