You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Bahubali Jain <ba...@gmail.com> on 2014/11/15 12:18:33 UTC
Help with Spark Streaming
Hi,
Trying to use spark streaming, but I am struggling with word count :(
I want consolidate output of the word count (not on a per window basis), so
I am using updateStateByKey(), but for some reason this is not working.
The function it self is not being invoked(do not see the sysout output on
console).
public final class WordCount {
private static final Pattern SPACE = Pattern.compile(" ");
public static void main(String[] args) {
if (args.length < 2) {
System.err.println("Usage: JavaNetworkWordCount <hostname>
<port>");
System.exit(1);
}
// Create the context with a 1 second batch size
SparkConf sparkConf = new
SparkConf().setAppName("JavaNetworkWordCount");
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new
Duration(1000));
ssc.checkpoint("/tmp/worcount");
// Create a JavaReceiverInputDStream on target ip:port and count the
// words in input stream of \n delimited text (eg. generated by
'nc')
// Note that no duplication in storage level only for running
locally.
// Replication necessary in distributed scenario for fault
tolerance.
JavaReceiverInputDStream<String> lines = ssc.socketTextStream(
args[0], Integer.parseInt(args[1]),
StorageLevels.MEMORY_AND_DISK_SER);
JavaDStream<String> words = lines.flatMap(new
FlatMapFunction<String, String>() {
@Override
public Iterable<String> call(String x) {
return Lists.newArrayList(SPACE.split(x));
}
});
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) {
System.err.println("Got "+s);
return new Tuple2<String, Integer>(s, 1);
}
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});
wordCounts.print();
* wordCounts.updateStateByKey(new updateFunction());*
ssc.start();
ssc.awaitTermination();
}
}
class updateFunction implements Function2<List<Integer>, Optional<Integer>,
Optional<Integer>>
{
@Override public Optional<Integer> call(List<Integer> values,
Optional<Integer> state) {
Integer x = new Integer(0);
for (Integer i:values)
x = x+i;
Integer newSum = state.or(0)+x; // add the new values with the
previous running count to get the new count
System.out.println("Newsum is "+newSum);
return Optional.of(newSum);
};
}
Re: Help with Spark Streaming
Posted by ZhangYi <yi...@thoughtworks.com>.
I guess, maybe you don’t need invoke reduceByKey() after mapToPair, because updateStateByKey had covered it. For your reference, here is a sample written by scala using text file stream instead of socket as below:
object LocalStatefulWordCount extends App {
val sparkConf = new SparkConf().setAppName("HdfsWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(2))
//must set checkpoint for updateStateByKey
//note: checkpoint derectory can not be source directory
ssc.checkpoint("./checkpoint")
val updateFunc = (values: Seq[Int], state: Option[Int]) => {
val currentCount = values.foldLeft(0)(_ + _)
val previousCount = state.getOrElse(0)
Some(currentCount + previousCount)
}
val lines = ssc.textFileStream("/Users/twer/workspace/scala101/data") //local directory
val wordDstream = lines.flatMap(_.split(" ")).map(x => (x, 1))
val statefulWordCount = wordDstream.updateStateByKey[Int](updateFunc)
statefulWordCount.print()
ssc.start()
ssc.awaitTermination()
}
Zhang Yi / 张逸
Lead Consultant
Email
yizhang@thoughtworks.com (mailto:yizhang@thoughtworks.com)
Telephone
+86 15023157626 (mailto:+86 15023157626)
Sent with Sparrow (http://www.sparrowmailapp.com/?sig)
On Sunday, November 16, 2014 at 6:19 PM, Bahubali Jain wrote:
> Hi,
> Can anybody help me on this please, haven't been able to find the problem :(
> Thanks.
> On Nov 15, 2014 4:48 PM, "Bahubali Jain" <bahubali@gmail.com (mailto:bahubali@gmail.com)> wrote:
> > Hi,
> > Trying to use spark streaming, but I am struggling with word count :(
> > I want consolidate output of the word count (not on a per window basis), so I am using updateStateByKey(), but for some reason this is not working.
> > The function it self is not being invoked(do not see the sysout output on console).
> >
> >
> > public final class WordCount {
> > private static final Pattern SPACE = Pattern.compile(" ");
> >
> > public static void main(String[] args) {
> > if (args.length < 2) {
> > System.err.println("Usage: JavaNetworkWordCount <hostname> <port>");
> > System.exit(1);
> > }
> >
> > // Create the context with a 1 second batch size
> > SparkConf sparkConf = new SparkConf().setAppName("JavaNetworkWordCount");
> > JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new Duration(1000));
> > ssc.checkpoint("/tmp/worcount");
> > // Create a JavaReceiverInputDStream on target ip:port and count the
> > // words in input stream of \n delimited text (eg. generated by 'nc')
> > // Note that no duplication in storage level only for running locally.
> > // Replication necessary in distributed scenario for fault tolerance.
> > JavaReceiverInputDStream<String> lines = ssc.socketTextStream(
> > args[0], Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER);
> > JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
> > @Override
> > public Iterable<String> call(String x) {
> > return Lists.newArrayList(SPACE.split(x));
> > }
> > });
> >
> > JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
> > new PairFunction<String, String, Integer>() {
> > @Override
> > public Tuple2<String, Integer> call(String s) {
> > System.err.println("Got "+s);
> > return new Tuple2<String, Integer>(s, 1);
> > }
> > }).reduceByKey(new Function2<Integer, Integer, Integer>() {
> > @Override
> > public Integer call(Integer i1, Integer i2) {
> > return i1 + i2;
> > }
> > });
> >
> > wordCounts.print();
> > wordCounts.updateStateByKey(new updateFunction());
> > ssc.start();
> > ssc.awaitTermination();
> > }
> > }
> >
> > class updateFunction implements Function2<List<Integer>, Optional<Integer>, Optional<Integer>>
> > {
> >
> > @Override public Optional<Integer> call(List<Integer> values, Optional<Integer> state) {
> >
> > Integer x = new Integer(0);
> > for (Integer i:values)
> > x = x+i;
> > Integer newSum = state.or(0)+x; // add the new values with the previous running count to get the new count
> > System.out.println("Newsum is "+newSum);
> > return Optional.of(newSum);
> >
> > };
> >
> > }
Re: Help with Spark Streaming
Posted by Bahubali Jain <ba...@gmail.com>.
Hi,
Can anybody help me on this please, haven't been able to find the problem
:(
Thanks.
On Nov 15, 2014 4:48 PM, "Bahubali Jain" <ba...@gmail.com> wrote:
> Hi,
> Trying to use spark streaming, but I am struggling with word count :(
> I want consolidate output of the word count (not on a per window basis),
> so I am using updateStateByKey(), but for some reason this is not working.
> The function it self is not being invoked(do not see the sysout output on
> console).
>
>
> public final class WordCount {
> private static final Pattern SPACE = Pattern.compile(" ");
>
> public static void main(String[] args) {
> if (args.length < 2) {
> System.err.println("Usage: JavaNetworkWordCount <hostname>
> <port>");
> System.exit(1);
> }
>
> // Create the context with a 1 second batch size
> SparkConf sparkConf = new
> SparkConf().setAppName("JavaNetworkWordCount");
> JavaStreamingContext ssc = new JavaStreamingContext(sparkConf,
> new Duration(1000));
> ssc.checkpoint("/tmp/worcount");
> // Create a JavaReceiverInputDStream on target ip:port and count
> the
> // words in input stream of \n delimited text (eg. generated by
> 'nc')
> // Note that no duplication in storage level only for running
> locally.
> // Replication necessary in distributed scenario for fault
> tolerance.
> JavaReceiverInputDStream<String> lines = ssc.socketTextStream(
> args[0], Integer.parseInt(args[1]),
> StorageLevels.MEMORY_AND_DISK_SER);
> JavaDStream<String> words = lines.flatMap(new
> FlatMapFunction<String, String>() {
> @Override
> public Iterable<String> call(String x) {
> return Lists.newArrayList(SPACE.split(x));
> }
> });
>
> JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
> new PairFunction<String, String, Integer>() {
> @Override
> public Tuple2<String, Integer> call(String s) {
> System.err.println("Got "+s);
> return new Tuple2<String, Integer>(s, 1);
> }
> }).reduceByKey(new Function2<Integer, Integer, Integer>() {
> @Override
> public Integer call(Integer i1, Integer i2) {
> return i1 + i2;
> }
> });
>
> wordCounts.print();
>
> * wordCounts.updateStateByKey(new updateFunction());*
> ssc.start();
> ssc.awaitTermination();
> }
> }
>
> class updateFunction implements Function2<List<Integer>,
> Optional<Integer>, Optional<Integer>>
> {
>
> @Override public Optional<Integer> call(List<Integer> values,
> Optional<Integer> state) {
>
> Integer x = new Integer(0);
> for (Integer i:values)
> x = x+i;
> Integer newSum = state.or(0)+x; // add the new values with the
> previous running count to get the new count
> System.out.println("Newsum is "+newSum);
> return Optional.of(newSum);
>
> };
>
> }
>