You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Bahubali Jain <ba...@gmail.com> on 2014/11/15 12:18:33 UTC

Help with Spark Streaming

Hi,
Trying to use spark streaming, but I am struggling with word count :(
I want consolidate output of the word count (not on a per window basis), so
I am using updateStateByKey(), but for some reason this is not working.
The function it self is not being invoked(do not see the sysout output on
console).


public final class WordCount {
  private static final Pattern SPACE = Pattern.compile(" ");

  public static void main(String[] args) {
        if (args.length < 2) {
          System.err.println("Usage: JavaNetworkWordCount <hostname>
<port>");
          System.exit(1);
        }

         // Create the context with a 1 second batch size
        SparkConf sparkConf = new
SparkConf().setAppName("JavaNetworkWordCount");
        JavaStreamingContext ssc = new JavaStreamingContext(sparkConf,  new
Duration(1000));
        ssc.checkpoint("/tmp/worcount");
        // Create a JavaReceiverInputDStream on target ip:port and count the
        // words in input stream of \n delimited text (eg. generated by
'nc')
        // Note that no duplication in storage level only for running
locally.
        // Replication necessary in distributed scenario for fault
tolerance.
        JavaReceiverInputDStream<String> lines = ssc.socketTextStream(
                args[0], Integer.parseInt(args[1]),
StorageLevels.MEMORY_AND_DISK_SER);
        JavaDStream<String> words = lines.flatMap(new
FlatMapFunction<String, String>() {
          @Override
          public Iterable<String> call(String x) {
            return Lists.newArrayList(SPACE.split(x));
          }
        });

        JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
          new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String s) {
                System.err.println("Got "+s);
              return new Tuple2<String, Integer>(s, 1);
            }
          }).reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer i1, Integer i2) {
              return i1 + i2;
            }
          });

        wordCounts.print();

*        wordCounts.updateStateByKey(new updateFunction());*
 ssc.start();
        ssc.awaitTermination();
  }
}

class updateFunction implements Function2<List<Integer>, Optional<Integer>,
Optional<Integer>>
{

      @Override public Optional<Integer> call(List<Integer> values,
Optional<Integer> state) {

         Integer x = new Integer(0);
         for (Integer i:values)
             x = x+i;
        Integer newSum = state.or(0)+x;  // add the new values with the
previous running count to get the new count
        System.out.println("Newsum is "+newSum);
        return Optional.of(newSum);

      };

}

Re: Help with Spark Streaming

Posted by ZhangYi <yi...@thoughtworks.com>.
I guess, maybe you don’t need invoke reduceByKey() after mapToPair, because updateStateByKey had covered it. For your reference, here is a sample written by scala using text file stream instead of socket as below:

object LocalStatefulWordCount extends App {
  val sparkConf = new SparkConf().setAppName("HdfsWordCount")
  val ssc = new StreamingContext(sparkConf, Seconds(2))

  //must set checkpoint for updateStateByKey
  //note: checkpoint derectory can not be source directory
  ssc.checkpoint("./checkpoint")

  val updateFunc = (values: Seq[Int], state: Option[Int]) => {
    val currentCount = values.foldLeft(0)(_ + _)
    val previousCount = state.getOrElse(0)
    Some(currentCount + previousCount)
  }

  val lines = ssc.textFileStream("/Users/twer/workspace/scala101/data")   //local directory
  val wordDstream = lines.flatMap(_.split(" ")).map(x => (x, 1))
  val statefulWordCount = wordDstream.updateStateByKey[Int](updateFunc)
  statefulWordCount.print()

  ssc.start()
  ssc.awaitTermination()
}




Zhang Yi / 张逸
Lead Consultant

Email
yizhang@thoughtworks.com (mailto:yizhang@thoughtworks.com)

Telephone
+86 15023157626 (mailto:+86 15023157626)






Sent with Sparrow (http://www.sparrowmailapp.com/?sig)


On Sunday, November 16, 2014 at 6:19 PM, Bahubali Jain wrote:

> Hi,  
> Can anybody help me on this please, haven't been  able to find the problem :(  
> Thanks.  
> On Nov 15, 2014 4:48 PM, "Bahubali Jain" <bahubali@gmail.com (mailto:bahubali@gmail.com)> wrote:
> > Hi,
> > Trying to use spark streaming, but I am struggling with word count :(
> > I want consolidate output of the word count (not on a per window basis), so I am using updateStateByKey(), but for some reason this is not working.
> > The function it self is not being invoked(do not see the sysout output on console).
> >  
> >  
> > public final class WordCount {
> >   private static final Pattern SPACE = Pattern.compile(" ");
> >  
> >   public static void main(String[] args) {
> >         if (args.length < 2) {
> >           System.err.println("Usage: JavaNetworkWordCount <hostname> <port>");
> >           System.exit(1);
> >         }
> >  
> >          // Create the context with a 1 second batch size
> >         SparkConf sparkConf = new SparkConf().setAppName("JavaNetworkWordCount");
> >         JavaStreamingContext ssc = new JavaStreamingContext(sparkConf,  new Duration(1000));
> >         ssc.checkpoint("/tmp/worcount");
> >         // Create a JavaReceiverInputDStream on target ip:port and count the
> >         // words in input stream of \n delimited text (eg. generated by 'nc')
> >         // Note that no duplication in storage level only for running locally.
> >         // Replication necessary in distributed scenario for fault tolerance.
> >         JavaReceiverInputDStream<String> lines = ssc.socketTextStream(
> >                 args[0], Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER);
> >         JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
> >           @Override
> >           public Iterable<String> call(String x) {
> >             return Lists.newArrayList(SPACE.split(x));
> >           }
> >         });
> >  
> >         JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
> >           new PairFunction<String, String, Integer>() {
> >             @Override
> >             public Tuple2<String, Integer> call(String s) {
> >                 System.err.println("Got "+s);
> >               return new Tuple2<String, Integer>(s, 1);
> >             }
> >           }).reduceByKey(new Function2<Integer, Integer, Integer>() {
> >             @Override
> >             public Integer call(Integer i1, Integer i2) {
> >               return i1 + i2;
> >             }
> >           });
> >  
> >         wordCounts.print();
> >         wordCounts.updateStateByKey(new updateFunction());
> >         ssc.start();
> >         ssc.awaitTermination();
> >   }
> > }
> >  
> > class updateFunction implements Function2<List<Integer>, Optional<Integer>, Optional<Integer>>  
> > {
> >  
> >       @Override public Optional<Integer> call(List<Integer> values, Optional<Integer> state) {
> >            
> >          Integer x = new Integer(0);
> >          for (Integer i:values)
> >              x = x+i;
> >         Integer newSum = state.or(0)+x;  // add the new values with the previous running count to get the new count
> >         System.out.println("Newsum is "+newSum);
> >         return Optional.of(newSum);
> >        
> >       };
> >  
> > }  


Re: Help with Spark Streaming

Posted by Bahubali Jain <ba...@gmail.com>.
Hi,
Can anybody help me on this please, haven't been  able to find the problem
:(

Thanks.
On Nov 15, 2014 4:48 PM, "Bahubali Jain" <ba...@gmail.com> wrote:

> Hi,
> Trying to use spark streaming, but I am struggling with word count :(
> I want consolidate output of the word count (not on a per window basis),
> so I am using updateStateByKey(), but for some reason this is not working.
> The function it self is not being invoked(do not see the sysout output on
> console).
>
>
> public final class WordCount {
>   private static final Pattern SPACE = Pattern.compile(" ");
>
>   public static void main(String[] args) {
>         if (args.length < 2) {
>           System.err.println("Usage: JavaNetworkWordCount <hostname>
> <port>");
>           System.exit(1);
>         }
>
>          // Create the context with a 1 second batch size
>         SparkConf sparkConf = new
> SparkConf().setAppName("JavaNetworkWordCount");
>         JavaStreamingContext ssc = new JavaStreamingContext(sparkConf,
> new Duration(1000));
>         ssc.checkpoint("/tmp/worcount");
>         // Create a JavaReceiverInputDStream on target ip:port and count
> the
>         // words in input stream of \n delimited text (eg. generated by
> 'nc')
>         // Note that no duplication in storage level only for running
> locally.
>         // Replication necessary in distributed scenario for fault
> tolerance.
>         JavaReceiverInputDStream<String> lines = ssc.socketTextStream(
>                 args[0], Integer.parseInt(args[1]),
> StorageLevels.MEMORY_AND_DISK_SER);
>         JavaDStream<String> words = lines.flatMap(new
> FlatMapFunction<String, String>() {
>           @Override
>           public Iterable<String> call(String x) {
>             return Lists.newArrayList(SPACE.split(x));
>           }
>         });
>
>         JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
>           new PairFunction<String, String, Integer>() {
>             @Override
>             public Tuple2<String, Integer> call(String s) {
>                 System.err.println("Got "+s);
>               return new Tuple2<String, Integer>(s, 1);
>             }
>           }).reduceByKey(new Function2<Integer, Integer, Integer>() {
>             @Override
>             public Integer call(Integer i1, Integer i2) {
>               return i1 + i2;
>             }
>           });
>
>         wordCounts.print();
>
> *        wordCounts.updateStateByKey(new updateFunction());*
>  ssc.start();
>         ssc.awaitTermination();
>   }
> }
>
> class updateFunction implements Function2<List<Integer>,
> Optional<Integer>, Optional<Integer>>
> {
>
>       @Override public Optional<Integer> call(List<Integer> values,
> Optional<Integer> state) {
>
>          Integer x = new Integer(0);
>          for (Integer i:values)
>              x = x+i;
>         Integer newSum = state.or(0)+x;  // add the new values with the
> previous running count to get the new count
>         System.out.println("Newsum is "+newSum);
>         return Optional.of(newSum);
>
>       };
>
> }
>