You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Eduardo Costa Alfaia <e....@unibs.it> on 2014/02/04 00:19:57 UTC

Re: Source code JavaNetworkWordcount

Hi Tathagata,

You were right when you have said for me to use scala against java, scala
is very easy. I have implemented that code you have given (in bold), but I
have implemented also an union function(in red) because I am testing with 2
stream sources, my idea is putting 3 or more stream sources and doing the
union.

object NetworkWordCount {
 37   def main(args: Array[String]) {
 38     if (args.length < 1) {
 39       System.err.println("Usage: NetworkWordCount <master> <hostname>
<port>\n" +
 40         "In local mode, <master> should be 'local[n]' with n > 1")
 41       System.exit(1)
 42     }
 43
 44     StreamingExamples.setStreamingLogLevels()
 45
 46     // Create the context with a 1 second batch size
 47     val ssc = new StreamingContext(args(0), "NetworkWordCount",
Seconds(1),
 48       System.getenv("SPARK_HOME"),
StreamingContext.jarOfClass(this.getClass))
 49         ssc.checkpoint("hdfs://computer22:54310/user/root/INPUT")
 50     // Create a socket text stream on target ip:port and count the
 51     // words in the input stream of \n delimited text (eg. generated by
'nc')
 52     *val lines1 = ssc.socketTextStream("localhost", "12345".toInt,
StorageLevel.MEMORY_ONLY_SER)*
* 53     val lines2 = ssc.socketTextStream("localhost", "12345".toInt,
StorageLevel.MEMORY_ONLY_SER)*
* 54     val union2 = lines1.union(lines2)*
 55         //val words = lines.flatMap(_.split(" "))
 56         *val words = union2.flatMap(_.split(" "))*
 57     val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
 58
 59        * words.count().foreachRDD(rdd => {*
* 60     val totalCount = rdd.first()*
* 61 *
* 62     // print to screen*
* 63     println(totalCount)*
* 64 *
* 65     // append count to file*
* 66   //  ...*
* 67 })*
         //wordCounts.print()
 70     ssc.start()
 71     ssc.awaitTermination()
 72   }
 73 }

What do you think? is My code right?

I have obtained the follow result:

root@computer8:/opt/unibs_test/incubator-spark-tdas# bin/run-example
org.apache.spark.streaming.examples.NetworkWordCount
spark://192.168.0.13:7077SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in
[jar:file:/opt/unibs_test/incubator-spark-tdas/examples/target/scala-2.10/spark-examples-assembly-0.9.0-incubating-SNAPSHOT.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in
[jar:file:/opt/unibs_test/incubator-spark-tdas/assembly/target/scala-2.10/spark-assembly-0.9.0-incubating-SNAPSHOT-hadoop1.0.4.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
14/02/04 00:02:07 INFO StreamingExamples: Using Spark's default log4j
profile: org/apache/spark/log4j-defaults.properties
14/02/04 00:02:07 INFO StreamingExamples: Setting log level to [WARN] for
streaming example. To override add a custom log4j.properties to the
classpath.
0
0
0
0
0
0
0
0
0
0
0
0
90715
1375825
882490
941226
811032
734399
804453
718688
1058695
854417
813263
798885
785455
952804
780140
697533


Thanks Tathagata.

Att


2014-01-30 Eduardo Costa Alfaia <e....@unibs.it>:

> Hi Tathagata,
>
> Thank you by your explanations it'll be useful to me to understand how
> work this piece of code to do that we want. We have created a code in C
> which send a txt file, for example Don Quixote, like a stream over the
> network so we've changed the java code from JavaNetworkWordcount to connect
> in each source described within source code. Bellow it is that we've
> inserted, three streams sources.
>
>       JavaDStream<String> lines1 = ssc1.socketTextStream("localhost",
> Integer.parseInt("12345"));
>       JavaDStream<String> lines2 = ssc1.socketTextStream("localhost",
> Integer.parseInt("12345"));
>       JavaDStream<String> lines3 = ssc1.socketTextStream("localhost",
> Integer.parseInt("12345"));
>       JavaDStream<String> union2 = lines1.union(lines2);
>       JavaDStream<String> union3 = union2.union(lines3);
>       JavaDStream<String> words = union3.flatMap(new
> FlatMapFunction<String, String>() {
>
> So, the second option that you've given me I think to be the better option.
>  Sorry Tathagata for my insistence in this case and I thank you by your
> patient.
>
> Best Regards
>
>
> 2014-01-30 Tathagata Das <ta...@gmail.com>
>
> Let me first ask for a few clarifications.
>>
>> 1. If you just want to count the words in a single text file like Don
>> Quixote (that is, not for a stream of data), you should use only Spark.
>> Then the program to count the frequency of words in a text file would look
>> like this in Java. If you are not super-comfortable with Java, then I
>> strongly recommend using the Scala API or pyspark. For scala, it may be a
>> little trickier to learn if you have absolutely no idea. But it is worth
>> it. The frequency count would look like this.
>>
>> val sc = new SparkContext(...)
>> val linesInFile = sc.textFile("path_to_file")
>> val words = linesInFile.flatMap(line => line.split(" "))
>> val frequencies = words.map(word => (word, 1L)).reduceByKey(_ + _)
>> println("Word frequencies = " + frequences.collect())      // collect is
>> costly if the file is large
>>
>>
>> 2. Let me assume that you want to do read a stream of text over the
>> network and then print the count of total number of words into a file. Note
>> that it is "total number of words" and not "frequency of each word". The
>> Java version would be something like this.
>>
>> DStream<Integer> totalCounts = words.count();
>>
>> totalCounts.foreachRDD(new Function2<JavaRDD<Long>, Time, Void>() {
>>    @Override public Void call(JavaRDD<Long> pairRDD, Time time) throws
>> Exception {
>>            Long totalCount = totalCounts.first();
>>
>>            // print to screen
>>            System.out.println(totalCount);
>>
>>           // append count to file
>>           ...
>>           return null;
>>     }
>> })
>>
>> This is count how many words have been received in each batch. The Scala
>> version would be much simpler to read.
>>
>> words.count().foreachRDD(rdd => {
>>     val totalCount = rdd.first()
>>
>>     // print to screen
>>     println(totalCount)
>>
>>     // append count to file
>>     ...
>> })
>>
>> Hope this helps! I apologize if the code doesnt compile, I didnt test for
>> syntax and stuff.
>>
>> TD
>>
>>
>>
>> On Thu, Jan 30, 2014 at 8:12 AM, Eduardo Costa Alfaia <
>> e.costaalfaia@unibs.it> wrote:
>>
>>> Hi Guys,
>>>
>>> I'm not very good like java programmer, so anybody could me help with
>>> this
>>> code piece from JavaNetworkWordcount:
>>>
>>> JavaPairDStream<String, Integer> wordCounts = words.map(
>>>         new PairFunction<String, String, Integer>() {
>>>      @Override
>>>           public Tuple2<String, Integer> call(String s) throws Exception
>>> {
>>>             return new Tuple2<String, Integer>(s, 1);
>>>           }
>>>         }).reduceByKey(new Function2<Integer, Integer, Integer>() {
>>>           @Override
>>>           public Integer call(Integer i1, Integer i2) throws Exception {
>>>             return i1 + i2;
>>>           }
>>>         });
>>>
>>>       JavaPairDStream<String, Integer> counts =
>>> wordCounts.reduceByKeyAndWindow(
>>>         new Function2<Integer, Integer, Integer>() {
>>>           public Integer call(Integer i1, Integer i2) { return i1 + i2; }
>>>         },
>>>         new Function2<Integer, Integer, Integer>() {
>>>           public Integer call(Integer i1, Integer i2) { return i1 - i2; }
>>>         },
>>>         new Duration(60 * 5 * 1000),
>>>         new Duration(1 * 1000)
>>>       );
>>>
>>> I would like to think a manner of counting and after summing  and
>>> getting a
>>> total from words counted in a single file, for example a book in txt
>>> extension Don Quixote. The counts function give me the resulted from each
>>> word has found and not a total of words from the file.
>>> Tathagata has sent me a piece from scala code, Thanks Tathagata by your
>>> attention with my posts I am very thankfully,
>>>
>>>   yourDStream.foreachRDD(rdd => {
>>>
>>>    // Get and print first n elements
>>>    val firstN = rdd.take(n)
>>>    println("First N elements = " + firstN)
>>>
>>>   // Count the number of elements in each batch
>>>   println("RDD has " + rdd.count() + " elements")
>>>
>>> })
>>>
>>> yourDStream.count.print()
>>>
>>> Could anybody help me?
>>>
>>>
>>> Thanks Guys
>>>
>>> --
>>>
>>> INFORMATIVA SUL TRATTAMENTO DEI DATI PERSONALI
>>>
>>> I dati utilizzati per l'invio del presente messaggio sono trattati
>>> dall'Università degli Studi di Brescia esclusivamente per finalità
>>> istituzionali. Informazioni più dettagliate anche in ordine ai diritti
>>>
>>> dell'interessato sono riposte nell'informativa generale e nelle notizie
>>> pubblicate sul sito web dell'Ateneo nella sezione "Privacy".
>>>
>>> Il contenuto di questo messaggio è rivolto unicamente alle persona cui
>>> è indirizzato e può contenere informazioni la cui riservatezza è
>>>
>>> tutelata legalmente. Ne sono vietati la riproduzione, la diffusione e
>>> l'uso
>>> in mancanza di autorizzazione del destinatario. Qualora il messaggio
>>> fosse pervenuto per errore, preghiamo di eliminarlo.
>>>
>>
>>
>


-- 
MSc Eduardo Costa Alfaia
PhD Student
Università degli Studi di Brescia

-- 
INFORMATIVA SUL TRATTAMENTO DEI DATI PERSONALI

I dati utilizzati per l'invio del presente messaggio sono trattati 
dall'Università degli Studi di Brescia esclusivamente per finalità 
istituzionali. Informazioni più dettagliate anche in ordine ai diritti 
dell'interessato sono riposte nell'informativa generale e nelle notizie 
pubblicate sul sito web dell'Ateneo nella sezione "Privacy".

Il contenuto di questo messaggio è rivolto unicamente alle persona cui 
è indirizzato e può contenere informazioni la cui riservatezza è 
tutelata legalmente. Ne sono vietati la riproduzione, la diffusione e l'uso 
in mancanza di autorizzazione del destinatario. Qualora il messaggio 
fosse pervenuto per errore, preghiamo di eliminarlo.

Re: Source code JavaNetworkWordcount

Posted by Tathagata Das <ta...@gmail.com>.
Yes. You should be able to.

Lets try to have future conversations through the
user@spark.incubator.apache.org mailing list :)


On Wed, Feb 5, 2014 at 2:33 PM, Eduardo Costa Alfaia <e.costaalfaia@unibs.it
> wrote:

> So I could use reduceByKeyAndWindow like this
> val wordCounts = words.map(x => (x, 1)).reduceByKeyAndWindow(_ + _,
> Seconds(30), Seconds(10))
> ?
>
>
> > The reduceByKeyAndWindow and other ***ByKey****   operations work only on
> > DStreams of key-value pairs. "Words" is a DStream[String], so its not
> > key-value pairs. "words.map(x => (x, 1))" is DStream[(String, Int)] that
> > has key-value pairs, so you can call reduceByKeyAndWindow.
> >
> > TD
> >
> >
> > On Wed, Feb 5, 2014 at 8:15 AM, Eduardo Costa Alfaia <
> e.costaalfaia@unibs.it
> >> wrote:
> >
> >> Hi Tathagata
> >> I am playing with NetworkWordCount.scala, I did some changes like
> this(in
> >> red):
> >>
> >> // Create the context with a 1 second batch size
> >> 67     val ssc = new StreamingContext(args(0), "NetworkWordCount",
> >> Seconds(1),
> >> 68       System.getenv("SPARK_HOME"),
> >> StreamingContext.jarOfClass(this.getClass))
> >> 69         ssc.checkpoint("hdfs://computer8:54310/user/root/INPUT")
> >> 70     // Create a socket text stream on target ip:port and count the
> >> 71     // words in the input stream of \n delimited text (eg. generated
> >> by 'nc')
> >> 72     val lines1 = ssc.socketTextStream("localhost", "12345".toInt,
> >> StorageLevel.MEMORY_ONLY)
> >> 73     val lines2 = ssc.socketTextStream("localhost", "12345".toInt,
> >> StorageLevel.MEMORY_ONLY)
> >> 74     val lines3 = ssc.socketTextStream("localhost", "12345".toInt,
> >> StorageLevel.MEMORY_ONLY)
> >> 75     val union2 = lines1.union(lines2)
> >> 76     val union3 = union2.union(lines3)
> >> 77
> >> 78         //val words = lines.flatMap(_.split(" "))
> >> 79         val words = union3.flatMap(_.split(" "))
> >> 80 //    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
> >> 81         val wordCounts = words.reduceByKeyAndWindow(_ + _,
> >> Seconds(30), Seconds(10))
> >>
> >> However I have gotten the error bellow:
> >>
> >> [error]
> >>
> /opt/unibs_test/incubator-spark-tdas/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala:81:
> >> value reduceByKeyAndWindow is not a member of
> >> org.apache.spark.streaming.dstream.DStream[String]
> >> [error]         val wordCounts = words.reduceByKeyAndWindow(_ + _,
> >> Seconds(30), Seconds(10))
> >> [error]                                ^
> >> [error] one error found
> >> [error] (examples/compile:compile) Compilation failed
> >> [error] Total time: 15 s, completed 05-Feb-2014 17:10:38
> >>
> >>
> >> The class is import within the code:
> >>
> >> import org.apache.spark.streaming.{Seconds, StreamingContext}
> >> import org.apache.spark.streaming.StreamingContext._
> >> import org.apache.spark.storage.StorageLevel
> >>
> >>
> >> Thanks
> >>
> >> On Feb 5, 2014, at 5:22, Tathagata Das <ta...@gmail.com>
> >> wrote:
> >>
> >>> Seems good to me. BTW, its find to MEMORY_ONLY (i.e. without
> replication)
> >>> for testing, but you should turn on replication if you want
> >>> fault-tolerance.
> >>>
> >>> TD
> >>>
> >>>
> >>> On Mon, Feb 3, 2014 at 3:19 PM, Eduardo Costa Alfaia <
> >> e.costaalfaia@unibs.it
> >>>> wrote:
> >>>
> >>>> Hi Tathagata,
> >>>>
> >>>> You were right when you have said for me to use scala against java,
> >> scala
> >>>> is very easy. I have implemented that code you have given (in bold),
> >> but I
> >>>> have implemented also an union function(in red) because I am testing
> >> with 2
> >>>> stream sources, my idea is putting 3 or more stream sources and doing
> >> the
> >>>> union.
> >>>>
> >>>> object NetworkWordCount {
> >>>> 37   def main(args: Array[String]) {
> >>>> 38     if (args.length < 1) {
> >>>> 39       System.err.println("Usage: NetworkWordCount <master>
> <hostname>
> >>>> <port>\n" +
> >>>> 40         "In local mode, <master> should be 'local[n]' with n > 1")
> >>>> 41       System.exit(1)
> >>>> 42     }
> >>>> 43
> >>>> 44     StreamingExamples.setStreamingLogLevels()
> >>>> 45
> >>>> 46     // Create the context with a 1 second batch size
> >>>> 47     val ssc = new StreamingContext(args(0), "NetworkWordCount",
> >>>> Seconds(1),
> >>>> 48       System.getenv("SPARK_HOME"),
> >>>> StreamingContext.jarOfClass(this.getClass))
> >>>> 49         ssc.checkpoint("hdfs://computer22:54310/user/root/INPUT")
> >>>> 50     // Create a socket text stream on target ip:port and count the
> >>>> 51     // words in the input stream of \n delimited text (eg.
> generated
> >> by
> >>>> 'nc')
> >>>> 52     *val lines1 = ssc.socketTextStream("localhost", "12345".toInt,
> >>>> StorageLevel.MEMORY_ONLY_SER)*
> >>>> * 53     val lines2 = ssc.socketTextStream("localhost", "12345".toInt,
> >>>> StorageLevel.MEMORY_ONLY_SER)*
> >>>> * 54     val union2 = lines1.union(lines2)*
> >>>> 55         //val words = lines.flatMap(_.split(" "))
> >>>> 56         *val words = union2.flatMap(_.split(" "))*
> >>>> 57     val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
> >>>> 58
> >>>> 59        * words.count().foreachRDD(rdd => {*
> >>>> * 60     val totalCount = rdd.first()*
> >>>> * 61 *
> >>>> * 62     // print to screen*
> >>>> * 63     println(totalCount)*
> >>>> * 64 *
> >>>> * 65     // append count to file*
> >>>> * 66   //  ...*
> >>>> * 67 })*
> >>>>        //wordCounts.print()
> >>>> 70     ssc.start()
> >>>> 71     ssc.awaitTermination()
> >>>> 72   }
> >>>> 73 }
> >>>>
> >>>> What do you think? is My code right?
> >>>>
> >>>> I have obtained the follow result:
> >>>>
> >>>> root@computer8:/opt/unibs_test/incubator-spark-tdas# bin/run-example
> >>>> org.apache.spark.streaming.examples.NetworkWordCount
> >>>> spark://192.168.0.13:7077SLF4J: Class path contains multiple SLF4J
> >>>> bindings.
> >>>> SLF4J: Found binding in
> >>>>
> >>>>
> >>
> [jar:file:/opt/unibs_test/incubator-spark-tdas/examples/target/scala-2.10/spark-examples-assembly-0.9.0-incubating-SNAPSHOT.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> >>>> SLF4J: Found binding in
> >>>>
> >>>>
> >>
> [jar:file:/opt/unibs_test/incubator-spark-tdas/assembly/target/scala-2.10/spark-assembly-0.9.0-incubating-SNAPSHOT-hadoop1.0.4.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> >>>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
> >>>> explanation.
> >>>> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
> >>>> 14/02/04 00:02:07 INFO StreamingExamples: Using Spark's default log4j
> >>>> profile: org/apache/spark/log4j-defaults.properties
> >>>> 14/02/04 00:02:07 INFO StreamingExamples: Setting log level to [WARN]
> >> for
> >>>> streaming example. To override add a custom log4j.properties to the
> >>>> classpath.
> >>>> 0
> >>>> 0
> >>>> 0
> >>>> 0
> >>>> 0
> >>>> 0
> >>>> 0
> >>>> 0
> >>>> 0
> >>>> 0
> >>>> 0
> >>>> 0
> >>>> 90715
> >>>> 1375825
> >>>> 882490
> >>>> 941226
> >>>> 811032
> >>>> 734399
> >>>> 804453
> >>>> 718688
> >>>> 1058695
> >>>> 854417
> >>>> 813263
> >>>> 798885
> >>>> 785455
> >>>> 952804
> >>>> 780140
> >>>> 697533
> >>>>
> >>>>
> >>>> Thanks Tathagata.
> >>>>
> >>>> Att
> >>>>
> >>>>
> >>>> 2014-01-30 Eduardo Costa Alfaia <e....@unibs.it>:
> >>>>
> >>>>> Hi Tathagata,
> >>>>>
> >>>>> Thank you by your explanations it'll be useful to me to understand
> how
> >>>>> work this piece of code to do that we want. We have created a code
> in C
> >>>>> which send a txt file, for example Don Quixote, like a stream over
> the
> >>>>> network so we've changed the java code from JavaNetworkWordcount to
> >>>> connect
> >>>>> in each source described within source code. Bellow it is that we've
> >>>>> inserted, three streams sources.
> >>>>>
> >>>>>     JavaDStream<String> lines1 = ssc1.socketTextStream("localhost",
> >>>>> Integer.parseInt("12345"));
> >>>>>     JavaDStream<String> lines2 = ssc1.socketTextStream("localhost",
> >>>>> Integer.parseInt("12345"));
> >>>>>     JavaDStream<String> lines3 = ssc1.socketTextStream("localhost",
> >>>>> Integer.parseInt("12345"));
> >>>>>     JavaDStream<String> union2 = lines1.union(lines2);
> >>>>>     JavaDStream<String> union3 = union2.union(lines3);
> >>>>>     JavaDStream<String> words = union3.flatMap(new
> >>>>> FlatMapFunction<String, String>() {
> >>>>>
> >>>>> So, the second option that you've given me I think to be the better
> >>>> option.
> >>>>> Sorry Tathagata for my insistence in this case and I thank you by
> your
> >>>>> patient.
> >>>>>
> >>>>> Best Regards
> >>>>>
> >>>>>
> >>>>> 2014-01-30 Tathagata Das <ta...@gmail.com>
> >>>>>
> >>>>> Let me first ask for a few clarifications.
> >>>>>>
> >>>>>> 1. If you just want to count the words in a single text file like
> Don
> >>>>>> Quixote (that is, not for a stream of data), you should use only
> >> Spark.
> >>>>>> Then the program to count the frequency of words in a text file
> would
> >>>> look
> >>>>>> like this in Java. If you are not super-comfortable with Java, then
> I
> >>>>>> strongly recommend using the Scala API or pyspark. For scala, it may
> >> be
> >>>> a
> >>>>>> little trickier to learn if you have absolutely no idea. But it is
> >> worth
> >>>>>> it. The frequency count would look like this.
> >>>>>>
> >>>>>> val sc = new SparkContext(...)
> >>>>>> val linesInFile = sc.textFile("path_to_file")
> >>>>>> val words = linesInFile.flatMap(line => line.split(" "))
> >>>>>> val frequencies = words.map(word => (word, 1L)).reduceByKey(_ + _)
> >>>>>> println("Word frequencies = " + frequences.collect())      //
> collect
> >> is
> >>>>>> costly if the file is large
> >>>>>>
> >>>>>>
> >>>>>> 2. Let me assume that you want to do read a stream of text over the
> >>>>>> network and then print the count of total number of words into a
> file.
> >>>> Note
> >>>>>> that it is "total number of words" and not "frequency of each word".
> >> The
> >>>>>> Java version would be something like this.
> >>>>>>
> >>>>>> DStream<Integer> totalCounts = words.count();
> >>>>>>
> >>>>>> totalCounts.foreachRDD(new Function2<JavaRDD<Long>, Time, Void>() {
> >>>>>>  @Override public Void call(JavaRDD<Long> pairRDD, Time time) throws
> >>>>>> Exception {
> >>>>>>          Long totalCount = totalCounts.first();
> >>>>>>
> >>>>>>          // print to screen
> >>>>>>          System.out.println(totalCount);
> >>>>>>
> >>>>>>         // append count to file
> >>>>>>         ...
> >>>>>>         return null;
> >>>>>>   }
> >>>>>> })
> >>>>>>
> >>>>>> This is count how many words have been received in each batch. The
> >> Scala
> >>>>>> version would be much simpler to read.
> >>>>>>
> >>>>>> words.count().foreachRDD(rdd => {
> >>>>>>   val totalCount = rdd.first()
> >>>>>>
> >>>>>>   // print to screen
> >>>>>>   println(totalCount)
> >>>>>>
> >>>>>>   // append count to file
> >>>>>>   ...
> >>>>>> })
> >>>>>>
> >>>>>> Hope this helps! I apologize if the code doesnt compile, I didnt
> test
> >>>> for
> >>>>>> syntax and stuff.
> >>>>>>
> >>>>>> TD
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> On Thu, Jan 30, 2014 at 8:12 AM, Eduardo Costa Alfaia <
> >>>>>> e.costaalfaia@unibs.it> wrote:
> >>>>>>
> >>>>>>> Hi Guys,
> >>>>>>>
> >>>>>>> I'm not very good like java programmer, so anybody could me help
> with
> >>>>>>> this
> >>>>>>> code piece from JavaNetworkWordcount:
> >>>>>>>
> >>>>>>> JavaPairDStream<String, Integer> wordCounts = words.map(
> >>>>>>>       new PairFunction<String, String, Integer>() {
> >>>>>>>    @Override
> >>>>>>>         public Tuple2<String, Integer> call(String s) throws
> >>>> Exception
> >>>>>>> {
> >>>>>>>           return new Tuple2<String, Integer>(s, 1);
> >>>>>>>         }
> >>>>>>>       }).reduceByKey(new Function2<Integer, Integer, Integer>() {
> >>>>>>>         @Override
> >>>>>>>         public Integer call(Integer i1, Integer i2) throws
> Exception
> >>>> {
> >>>>>>>           return i1 + i2;
> >>>>>>>         }
> >>>>>>>       });
> >>>>>>>
> >>>>>>>     JavaPairDStream<String, Integer> counts =
> >>>>>>> wordCounts.reduceByKeyAndWindow(
> >>>>>>>       new Function2<Integer, Integer, Integer>() {
> >>>>>>>         public Integer call(Integer i1, Integer i2) { return i1 +
> >>>> i2; }
> >>>>>>>       },
> >>>>>>>       new Function2<Integer, Integer, Integer>() {
> >>>>>>>         public Integer call(Integer i1, Integer i2) { return i1 -
> >>>> i2; }
> >>>>>>>       },
> >>>>>>>       new Duration(60 * 5 * 1000),
> >>>>>>>       new Duration(1 * 1000)
> >>>>>>>     );
> >>>>>>>
> >>>>>>> I would like to think a manner of counting and after summing  and
> >>>>>>> getting a
> >>>>>>> total from words counted in a single file, for example a book in
> txt
> >>>>>>> extension Don Quixote. The counts function give me the resulted
> from
> >>>> each
> >>>>>>> word has found and not a total of words from the file.
> >>>>>>> Tathagata has sent me a piece from scala code, Thanks Tathagata by
> >> your
> >>>>>>> attention with my posts I am very thankfully,
> >>>>>>>
> >>>>>>> yourDStream.foreachRDD(rdd => {
> >>>>>>>
> >>>>>>>  // Get and print first n elements
> >>>>>>>  val firstN = rdd.take(n)
> >>>>>>>  println("First N elements = " + firstN)
> >>>>>>>
> >>>>>>> // Count the number of elements in each batch
> >>>>>>> println("RDD has " + rdd.count() + " elements")
> >>>>>>>
> >>>>>>> })
> >>>>>>>
> >>>>>>> yourDStream.count.print()
> >>>>>>>
> >>>>>>> Could anybody help me?
> >>>>>>>
> >>>>>>>
> >>>>>>> Thanks Guys
> >>>>>>>
> >>>>>>> --
> >>>>>>>
> >>>>>>> INFORMATIVA SUL TRATTAMENTO DEI DATI PERSONALI
> >>>>>>>
> >>>>>>> I dati utilizzati per l'invio del presente messaggio sono trattati
> >>>>>>> dall'Università degli Studi di Brescia esclusivamente per finalità
> >>>>>>> istituzionali. Informazioni più dettagliate anche in ordine ai
> >> diritti
> >>>>>>>
> >>>>>>> dell'interessato sono riposte nell'informativa generale e nelle
> >> notizie
> >>>>>>> pubblicate sul sito web dell'Ateneo nella sezione "Privacy".
> >>>>>>>
> >>>>>>> Il contenuto di questo messaggio è rivolto unicamente alle persona
> >> cui
> >>>>>>> è indirizzato e può contenere informazioni la cui riservatezza è
> >>>>>>>
> >>>>>>> tutelata legalmente. Ne sono vietati la riproduzione, la
> diffusione e
> >>>>>>> l'uso
> >>>>>>> in mancanza di autorizzazione del destinatario. Qualora il
> messaggio
> >>>>>>> fosse pervenuto per errore, preghiamo di eliminarlo.
> >>>>>>>
> >>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>>
> >>>> --
> >>>> MSc Eduardo Costa Alfaia
> >>>> PhD Student
> >>>> Università degli Studi di Brescia
> >>>>
> >>>> --
> >>>> INFORMATIVA SUL TRATTAMENTO DEI DATI PERSONALI
> >>>>
> >>>> I dati utilizzati per l'invio del presente messaggio sono trattati
> >>>> dall'Università degli Studi di Brescia esclusivamente per finalità
> >>>> istituzionali. Informazioni più dettagliate anche in ordine ai diritti
> >>>> dell'interessato sono riposte nell'informativa generale e nelle
> notizie
> >>>> pubblicate sul sito web dell'Ateneo nella sezione "Privacy".
> >>>>
> >>>> Il contenuto di questo messaggio è rivolto unicamente alle persona cui
> >>>> è indirizzato e può contenere informazioni la cui riservatezza è
> >>>> tutelata legalmente. Ne sono vietati la riproduzione, la diffusione e
> >> l'uso
> >>>> in mancanza di autorizzazione del destinatario. Qualora il messaggio
> >>>> fosse pervenuto per errore, preghiamo di eliminarlo.
> >>>>
> >>
> >>
> >> --
> >> INFORMATIVA SUL TRATTAMENTO DEI DATI PERSONALI
> >>
> >> I dati utilizzati per l'invio del presente messaggio sono trattati
> >> dall'Università degli Studi di Brescia esclusivamente per finalità
> >> istituzionali. Informazioni più dettagliate anche in ordine ai diritti
> >> dell'interessato sono riposte nell'informativa generale e nelle notizie
> >> pubblicate sul sito web dell'Ateneo nella sezione "Privacy".
> >>
> >> Il contenuto di questo messaggio è rivolto unicamente alle persona cui
> >> è indirizzato e può contenere informazioni la cui riservatezza è
> >> tutelata legalmente. Ne sono vietati la riproduzione, la diffusione e
> l'uso
> >> in mancanza di autorizzazione del destinatario. Qualora il messaggio
> >> fosse pervenuto per errore, preghiamo di eliminarlo.
> >>
>
>
> --
> INFORMATIVA SUL TRATTAMENTO DEI DATI PERSONALI
>
> I dati utilizzati per l'invio del presente messaggio sono trattati
> dall'Università degli Studi di Brescia esclusivamente per finalità
> istituzionali. Informazioni più dettagliate anche in ordine ai diritti
> dell'interessato sono riposte nell'informativa generale e nelle notizie
> pubblicate sul sito web dell'Ateneo nella sezione "Privacy".
>
> Il contenuto di questo messaggio è rivolto unicamente alle persona cui
> è indirizzato e può contenere informazioni la cui riservatezza è
> tutelata legalmente. Ne sono vietati la riproduzione, la diffusione e l'uso
> in mancanza di autorizzazione del destinatario. Qualora il messaggio
> fosse pervenuto per errore, preghiamo di eliminarlo.
>

Re: Source code JavaNetworkWordcount

Posted by Eduardo Costa Alfaia <e....@unibs.it>.
So I could use reduceByKeyAndWindow like this
val wordCounts = words.map(x => (x, 1)).reduceByKeyAndWindow(_ + _, Seconds(30), Seconds(10))
?


> The reduceByKeyAndWindow and other ***ByKey****   operations work only on
> DStreams of key-value pairs. "Words" is a DStream[String], so its not
> key-value pairs. "words.map(x => (x, 1))" is DStream[(String, Int)] that
> has key-value pairs, so you can call reduceByKeyAndWindow.
> 
> TD
> 
> 
> On Wed, Feb 5, 2014 at 8:15 AM, Eduardo Costa Alfaia <e.costaalfaia@unibs.it
>> wrote:
> 
>> Hi Tathagata
>> I am playing with NetworkWordCount.scala, I did some changes like this(in
>> red):
>> 
>> // Create the context with a 1 second batch size
>> 67     val ssc = new StreamingContext(args(0), "NetworkWordCount",
>> Seconds(1),
>> 68       System.getenv("SPARK_HOME"),
>> StreamingContext.jarOfClass(this.getClass))
>> 69         ssc.checkpoint("hdfs://computer8:54310/user/root/INPUT")
>> 70     // Create a socket text stream on target ip:port and count the
>> 71     // words in the input stream of \n delimited text (eg. generated
>> by 'nc')
>> 72     val lines1 = ssc.socketTextStream("localhost", "12345".toInt,
>> StorageLevel.MEMORY_ONLY)
>> 73     val lines2 = ssc.socketTextStream("localhost", "12345".toInt,
>> StorageLevel.MEMORY_ONLY)
>> 74     val lines3 = ssc.socketTextStream("localhost", "12345".toInt,
>> StorageLevel.MEMORY_ONLY)
>> 75     val union2 = lines1.union(lines2)
>> 76     val union3 = union2.union(lines3)
>> 77
>> 78         //val words = lines.flatMap(_.split(" "))
>> 79         val words = union3.flatMap(_.split(" "))
>> 80 //    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
>> 81         val wordCounts = words.reduceByKeyAndWindow(_ + _,
>> Seconds(30), Seconds(10))
>> 
>> However I have gotten the error bellow:
>> 
>> [error]
>> /opt/unibs_test/incubator-spark-tdas/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala:81:
>> value reduceByKeyAndWindow is not a member of
>> org.apache.spark.streaming.dstream.DStream[String]
>> [error]         val wordCounts = words.reduceByKeyAndWindow(_ + _,
>> Seconds(30), Seconds(10))
>> [error]                                ^
>> [error] one error found
>> [error] (examples/compile:compile) Compilation failed
>> [error] Total time: 15 s, completed 05-Feb-2014 17:10:38
>> 
>> 
>> The class is import within the code:
>> 
>> import org.apache.spark.streaming.{Seconds, StreamingContext}
>> import org.apache.spark.streaming.StreamingContext._
>> import org.apache.spark.storage.StorageLevel
>> 
>> 
>> Thanks
>> 
>> On Feb 5, 2014, at 5:22, Tathagata Das <ta...@gmail.com>
>> wrote:
>> 
>>> Seems good to me. BTW, its find to MEMORY_ONLY (i.e. without replication)
>>> for testing, but you should turn on replication if you want
>>> fault-tolerance.
>>> 
>>> TD
>>> 
>>> 
>>> On Mon, Feb 3, 2014 at 3:19 PM, Eduardo Costa Alfaia <
>> e.costaalfaia@unibs.it
>>>> wrote:
>>> 
>>>> Hi Tathagata,
>>>> 
>>>> You were right when you have said for me to use scala against java,
>> scala
>>>> is very easy. I have implemented that code you have given (in bold),
>> but I
>>>> have implemented also an union function(in red) because I am testing
>> with 2
>>>> stream sources, my idea is putting 3 or more stream sources and doing
>> the
>>>> union.
>>>> 
>>>> object NetworkWordCount {
>>>> 37   def main(args: Array[String]) {
>>>> 38     if (args.length < 1) {
>>>> 39       System.err.println("Usage: NetworkWordCount <master> <hostname>
>>>> <port>\n" +
>>>> 40         "In local mode, <master> should be 'local[n]' with n > 1")
>>>> 41       System.exit(1)
>>>> 42     }
>>>> 43
>>>> 44     StreamingExamples.setStreamingLogLevels()
>>>> 45
>>>> 46     // Create the context with a 1 second batch size
>>>> 47     val ssc = new StreamingContext(args(0), "NetworkWordCount",
>>>> Seconds(1),
>>>> 48       System.getenv("SPARK_HOME"),
>>>> StreamingContext.jarOfClass(this.getClass))
>>>> 49         ssc.checkpoint("hdfs://computer22:54310/user/root/INPUT")
>>>> 50     // Create a socket text stream on target ip:port and count the
>>>> 51     // words in the input stream of \n delimited text (eg. generated
>> by
>>>> 'nc')
>>>> 52     *val lines1 = ssc.socketTextStream("localhost", "12345".toInt,
>>>> StorageLevel.MEMORY_ONLY_SER)*
>>>> * 53     val lines2 = ssc.socketTextStream("localhost", "12345".toInt,
>>>> StorageLevel.MEMORY_ONLY_SER)*
>>>> * 54     val union2 = lines1.union(lines2)*
>>>> 55         //val words = lines.flatMap(_.split(" "))
>>>> 56         *val words = union2.flatMap(_.split(" "))*
>>>> 57     val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
>>>> 58
>>>> 59        * words.count().foreachRDD(rdd => {*
>>>> * 60     val totalCount = rdd.first()*
>>>> * 61 *
>>>> * 62     // print to screen*
>>>> * 63     println(totalCount)*
>>>> * 64 *
>>>> * 65     // append count to file*
>>>> * 66   //  ...*
>>>> * 67 })*
>>>>        //wordCounts.print()
>>>> 70     ssc.start()
>>>> 71     ssc.awaitTermination()
>>>> 72   }
>>>> 73 }
>>>> 
>>>> What do you think? is My code right?
>>>> 
>>>> I have obtained the follow result:
>>>> 
>>>> root@computer8:/opt/unibs_test/incubator-spark-tdas# bin/run-example
>>>> org.apache.spark.streaming.examples.NetworkWordCount
>>>> spark://192.168.0.13:7077SLF4J: Class path contains multiple SLF4J
>>>> bindings.
>>>> SLF4J: Found binding in
>>>> 
>>>> 
>> [jar:file:/opt/unibs_test/incubator-spark-tdas/examples/target/scala-2.10/spark-examples-assembly-0.9.0-incubating-SNAPSHOT.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>>> SLF4J: Found binding in
>>>> 
>>>> 
>> [jar:file:/opt/unibs_test/incubator-spark-tdas/assembly/target/scala-2.10/spark-assembly-0.9.0-incubating-SNAPSHOT-hadoop1.0.4.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
>>>> explanation.
>>>> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
>>>> 14/02/04 00:02:07 INFO StreamingExamples: Using Spark's default log4j
>>>> profile: org/apache/spark/log4j-defaults.properties
>>>> 14/02/04 00:02:07 INFO StreamingExamples: Setting log level to [WARN]
>> for
>>>> streaming example. To override add a custom log4j.properties to the
>>>> classpath.
>>>> 0
>>>> 0
>>>> 0
>>>> 0
>>>> 0
>>>> 0
>>>> 0
>>>> 0
>>>> 0
>>>> 0
>>>> 0
>>>> 0
>>>> 90715
>>>> 1375825
>>>> 882490
>>>> 941226
>>>> 811032
>>>> 734399
>>>> 804453
>>>> 718688
>>>> 1058695
>>>> 854417
>>>> 813263
>>>> 798885
>>>> 785455
>>>> 952804
>>>> 780140
>>>> 697533
>>>> 
>>>> 
>>>> Thanks Tathagata.
>>>> 
>>>> Att
>>>> 
>>>> 
>>>> 2014-01-30 Eduardo Costa Alfaia <e....@unibs.it>:
>>>> 
>>>>> Hi Tathagata,
>>>>> 
>>>>> Thank you by your explanations it'll be useful to me to understand how
>>>>> work this piece of code to do that we want. We have created a code in C
>>>>> which send a txt file, for example Don Quixote, like a stream over the
>>>>> network so we've changed the java code from JavaNetworkWordcount to
>>>> connect
>>>>> in each source described within source code. Bellow it is that we've
>>>>> inserted, three streams sources.
>>>>> 
>>>>>     JavaDStream<String> lines1 = ssc1.socketTextStream("localhost",
>>>>> Integer.parseInt("12345"));
>>>>>     JavaDStream<String> lines2 = ssc1.socketTextStream("localhost",
>>>>> Integer.parseInt("12345"));
>>>>>     JavaDStream<String> lines3 = ssc1.socketTextStream("localhost",
>>>>> Integer.parseInt("12345"));
>>>>>     JavaDStream<String> union2 = lines1.union(lines2);
>>>>>     JavaDStream<String> union3 = union2.union(lines3);
>>>>>     JavaDStream<String> words = union3.flatMap(new
>>>>> FlatMapFunction<String, String>() {
>>>>> 
>>>>> So, the second option that you've given me I think to be the better
>>>> option.
>>>>> Sorry Tathagata for my insistence in this case and I thank you by your
>>>>> patient.
>>>>> 
>>>>> Best Regards
>>>>> 
>>>>> 
>>>>> 2014-01-30 Tathagata Das <ta...@gmail.com>
>>>>> 
>>>>> Let me first ask for a few clarifications.
>>>>>> 
>>>>>> 1. If you just want to count the words in a single text file like Don
>>>>>> Quixote (that is, not for a stream of data), you should use only
>> Spark.
>>>>>> Then the program to count the frequency of words in a text file would
>>>> look
>>>>>> like this in Java. If you are not super-comfortable with Java, then I
>>>>>> strongly recommend using the Scala API or pyspark. For scala, it may
>> be
>>>> a
>>>>>> little trickier to learn if you have absolutely no idea. But it is
>> worth
>>>>>> it. The frequency count would look like this.
>>>>>> 
>>>>>> val sc = new SparkContext(...)
>>>>>> val linesInFile = sc.textFile("path_to_file")
>>>>>> val words = linesInFile.flatMap(line => line.split(" "))
>>>>>> val frequencies = words.map(word => (word, 1L)).reduceByKey(_ + _)
>>>>>> println("Word frequencies = " + frequences.collect())      // collect
>> is
>>>>>> costly if the file is large
>>>>>> 
>>>>>> 
>>>>>> 2. Let me assume that you want to do read a stream of text over the
>>>>>> network and then print the count of total number of words into a file.
>>>> Note
>>>>>> that it is "total number of words" and not "frequency of each word".
>> The
>>>>>> Java version would be something like this.
>>>>>> 
>>>>>> DStream<Integer> totalCounts = words.count();
>>>>>> 
>>>>>> totalCounts.foreachRDD(new Function2<JavaRDD<Long>, Time, Void>() {
>>>>>>  @Override public Void call(JavaRDD<Long> pairRDD, Time time) throws
>>>>>> Exception {
>>>>>>          Long totalCount = totalCounts.first();
>>>>>> 
>>>>>>          // print to screen
>>>>>>          System.out.println(totalCount);
>>>>>> 
>>>>>>         // append count to file
>>>>>>         ...
>>>>>>         return null;
>>>>>>   }
>>>>>> })
>>>>>> 
>>>>>> This is count how many words have been received in each batch. The
>> Scala
>>>>>> version would be much simpler to read.
>>>>>> 
>>>>>> words.count().foreachRDD(rdd => {
>>>>>>   val totalCount = rdd.first()
>>>>>> 
>>>>>>   // print to screen
>>>>>>   println(totalCount)
>>>>>> 
>>>>>>   // append count to file
>>>>>>   ...
>>>>>> })
>>>>>> 
>>>>>> Hope this helps! I apologize if the code doesnt compile, I didnt test
>>>> for
>>>>>> syntax and stuff.
>>>>>> 
>>>>>> TD
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> On Thu, Jan 30, 2014 at 8:12 AM, Eduardo Costa Alfaia <
>>>>>> e.costaalfaia@unibs.it> wrote:
>>>>>> 
>>>>>>> Hi Guys,
>>>>>>> 
>>>>>>> I'm not very good like java programmer, so anybody could me help with
>>>>>>> this
>>>>>>> code piece from JavaNetworkWordcount:
>>>>>>> 
>>>>>>> JavaPairDStream<String, Integer> wordCounts = words.map(
>>>>>>>       new PairFunction<String, String, Integer>() {
>>>>>>>    @Override
>>>>>>>         public Tuple2<String, Integer> call(String s) throws
>>>> Exception
>>>>>>> {
>>>>>>>           return new Tuple2<String, Integer>(s, 1);
>>>>>>>         }
>>>>>>>       }).reduceByKey(new Function2<Integer, Integer, Integer>() {
>>>>>>>         @Override
>>>>>>>         public Integer call(Integer i1, Integer i2) throws Exception
>>>> {
>>>>>>>           return i1 + i2;
>>>>>>>         }
>>>>>>>       });
>>>>>>> 
>>>>>>>     JavaPairDStream<String, Integer> counts =
>>>>>>> wordCounts.reduceByKeyAndWindow(
>>>>>>>       new Function2<Integer, Integer, Integer>() {
>>>>>>>         public Integer call(Integer i1, Integer i2) { return i1 +
>>>> i2; }
>>>>>>>       },
>>>>>>>       new Function2<Integer, Integer, Integer>() {
>>>>>>>         public Integer call(Integer i1, Integer i2) { return i1 -
>>>> i2; }
>>>>>>>       },
>>>>>>>       new Duration(60 * 5 * 1000),
>>>>>>>       new Duration(1 * 1000)
>>>>>>>     );
>>>>>>> 
>>>>>>> I would like to think a manner of counting and after summing  and
>>>>>>> getting a
>>>>>>> total from words counted in a single file, for example a book in txt
>>>>>>> extension Don Quixote. The counts function give me the resulted from
>>>> each
>>>>>>> word has found and not a total of words from the file.
>>>>>>> Tathagata has sent me a piece from scala code, Thanks Tathagata by
>> your
>>>>>>> attention with my posts I am very thankfully,
>>>>>>> 
>>>>>>> yourDStream.foreachRDD(rdd => {
>>>>>>> 
>>>>>>>  // Get and print first n elements
>>>>>>>  val firstN = rdd.take(n)
>>>>>>>  println("First N elements = " + firstN)
>>>>>>> 
>>>>>>> // Count the number of elements in each batch
>>>>>>> println("RDD has " + rdd.count() + " elements")
>>>>>>> 
>>>>>>> })
>>>>>>> 
>>>>>>> yourDStream.count.print()
>>>>>>> 
>>>>>>> Could anybody help me?
>>>>>>> 
>>>>>>> 
>>>>>>> Thanks Guys
>>>>>>> 
>>>>>>> --
>>>>>>> 
>>>>>>> INFORMATIVA SUL TRATTAMENTO DEI DATI PERSONALI
>>>>>>> 
>>>>>>> I dati utilizzati per l'invio del presente messaggio sono trattati
>>>>>>> dall'Università degli Studi di Brescia esclusivamente per finalità
>>>>>>> istituzionali. Informazioni più dettagliate anche in ordine ai
>> diritti
>>>>>>> 
>>>>>>> dell'interessato sono riposte nell'informativa generale e nelle
>> notizie
>>>>>>> pubblicate sul sito web dell'Ateneo nella sezione "Privacy".
>>>>>>> 
>>>>>>> Il contenuto di questo messaggio è rivolto unicamente alle persona
>> cui
>>>>>>> è indirizzato e può contenere informazioni la cui riservatezza è
>>>>>>> 
>>>>>>> tutelata legalmente. Ne sono vietati la riproduzione, la diffusione e
>>>>>>> l'uso
>>>>>>> in mancanza di autorizzazione del destinatario. Qualora il messaggio
>>>>>>> fosse pervenuto per errore, preghiamo di eliminarlo.
>>>>>>> 
>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>>> 
>>>> --
>>>> MSc Eduardo Costa Alfaia
>>>> PhD Student
>>>> Università degli Studi di Brescia
>>>> 
>>>> --
>>>> INFORMATIVA SUL TRATTAMENTO DEI DATI PERSONALI
>>>> 
>>>> I dati utilizzati per l'invio del presente messaggio sono trattati
>>>> dall'Università degli Studi di Brescia esclusivamente per finalità
>>>> istituzionali. Informazioni più dettagliate anche in ordine ai diritti
>>>> dell'interessato sono riposte nell'informativa generale e nelle notizie
>>>> pubblicate sul sito web dell'Ateneo nella sezione "Privacy".
>>>> 
>>>> Il contenuto di questo messaggio è rivolto unicamente alle persona cui
>>>> è indirizzato e può contenere informazioni la cui riservatezza è
>>>> tutelata legalmente. Ne sono vietati la riproduzione, la diffusione e
>> l'uso
>>>> in mancanza di autorizzazione del destinatario. Qualora il messaggio
>>>> fosse pervenuto per errore, preghiamo di eliminarlo.
>>>> 
>> 
>> 
>> --
>> INFORMATIVA SUL TRATTAMENTO DEI DATI PERSONALI
>> 
>> I dati utilizzati per l'invio del presente messaggio sono trattati
>> dall'Università degli Studi di Brescia esclusivamente per finalità
>> istituzionali. Informazioni più dettagliate anche in ordine ai diritti
>> dell'interessato sono riposte nell'informativa generale e nelle notizie
>> pubblicate sul sito web dell'Ateneo nella sezione "Privacy".
>> 
>> Il contenuto di questo messaggio è rivolto unicamente alle persona cui
>> è indirizzato e può contenere informazioni la cui riservatezza è
>> tutelata legalmente. Ne sono vietati la riproduzione, la diffusione e l'uso
>> in mancanza di autorizzazione del destinatario. Qualora il messaggio
>> fosse pervenuto per errore, preghiamo di eliminarlo.
>> 


-- 
INFORMATIVA SUL TRATTAMENTO DEI DATI PERSONALI

I dati utilizzati per l'invio del presente messaggio sono trattati 
dall'Università degli Studi di Brescia esclusivamente per finalità 
istituzionali. Informazioni più dettagliate anche in ordine ai diritti 
dell'interessato sono riposte nell'informativa generale e nelle notizie 
pubblicate sul sito web dell'Ateneo nella sezione "Privacy".

Il contenuto di questo messaggio è rivolto unicamente alle persona cui 
è indirizzato e può contenere informazioni la cui riservatezza è 
tutelata legalmente. Ne sono vietati la riproduzione, la diffusione e l'uso 
in mancanza di autorizzazione del destinatario. Qualora il messaggio 
fosse pervenuto per errore, preghiamo di eliminarlo.

Re: Source code JavaNetworkWordcount

Posted by Tathagata Das <ta...@gmail.com>.
The reduceByKeyAndWindow and other ***ByKey****   operations work only on
DStreams of key-value pairs. "Words" is a DStream[String], so its not
key-value pairs. "words.map(x => (x, 1))" is DStream[(String, Int)] that
has key-value pairs, so you can call reduceByKeyAndWindow.

TD


On Wed, Feb 5, 2014 at 8:15 AM, Eduardo Costa Alfaia <e.costaalfaia@unibs.it
> wrote:

> Hi Tathagata
> I am playing with NetworkWordCount.scala, I did some changes like this(in
> red):
>
>  // Create the context with a 1 second batch size
>  67     val ssc = new StreamingContext(args(0), "NetworkWordCount",
> Seconds(1),
>  68       System.getenv("SPARK_HOME"),
> StreamingContext.jarOfClass(this.getClass))
>  69         ssc.checkpoint("hdfs://computer8:54310/user/root/INPUT")
>  70     // Create a socket text stream on target ip:port and count the
>  71     // words in the input stream of \n delimited text (eg. generated
> by 'nc')
>  72     val lines1 = ssc.socketTextStream("localhost", "12345".toInt,
> StorageLevel.MEMORY_ONLY)
>  73     val lines2 = ssc.socketTextStream("localhost", "12345".toInt,
> StorageLevel.MEMORY_ONLY)
>  74     val lines3 = ssc.socketTextStream("localhost", "12345".toInt,
> StorageLevel.MEMORY_ONLY)
>  75     val union2 = lines1.union(lines2)
>  76     val union3 = union2.union(lines3)
>  77
>  78         //val words = lines.flatMap(_.split(" "))
>  79         val words = union3.flatMap(_.split(" "))
>  80 //    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
>  81         val wordCounts = words.reduceByKeyAndWindow(_ + _,
> Seconds(30), Seconds(10))
>
> However I have gotten the error bellow:
>
> [error]
> /opt/unibs_test/incubator-spark-tdas/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala:81:
> value reduceByKeyAndWindow is not a member of
> org.apache.spark.streaming.dstream.DStream[String]
> [error]         val wordCounts = words.reduceByKeyAndWindow(_ + _,
> Seconds(30), Seconds(10))
> [error]                                ^
> [error] one error found
> [error] (examples/compile:compile) Compilation failed
> [error] Total time: 15 s, completed 05-Feb-2014 17:10:38
>
>
> The class is import within the code:
>
> import org.apache.spark.streaming.{Seconds, StreamingContext}
> import org.apache.spark.streaming.StreamingContext._
> import org.apache.spark.storage.StorageLevel
>
>
> Thanks
>
> On Feb 5, 2014, at 5:22, Tathagata Das <ta...@gmail.com>
> wrote:
>
> > Seems good to me. BTW, its find to MEMORY_ONLY (i.e. without replication)
> > for testing, but you should turn on replication if you want
> > fault-tolerance.
> >
> > TD
> >
> >
> > On Mon, Feb 3, 2014 at 3:19 PM, Eduardo Costa Alfaia <
> e.costaalfaia@unibs.it
> >> wrote:
> >
> >> Hi Tathagata,
> >>
> >> You were right when you have said for me to use scala against java,
> scala
> >> is very easy. I have implemented that code you have given (in bold),
> but I
> >> have implemented also an union function(in red) because I am testing
> with 2
> >> stream sources, my idea is putting 3 or more stream sources and doing
> the
> >> union.
> >>
> >> object NetworkWordCount {
> >> 37   def main(args: Array[String]) {
> >> 38     if (args.length < 1) {
> >> 39       System.err.println("Usage: NetworkWordCount <master> <hostname>
> >> <port>\n" +
> >> 40         "In local mode, <master> should be 'local[n]' with n > 1")
> >> 41       System.exit(1)
> >> 42     }
> >> 43
> >> 44     StreamingExamples.setStreamingLogLevels()
> >> 45
> >> 46     // Create the context with a 1 second batch size
> >> 47     val ssc = new StreamingContext(args(0), "NetworkWordCount",
> >> Seconds(1),
> >> 48       System.getenv("SPARK_HOME"),
> >> StreamingContext.jarOfClass(this.getClass))
> >> 49         ssc.checkpoint("hdfs://computer22:54310/user/root/INPUT")
> >> 50     // Create a socket text stream on target ip:port and count the
> >> 51     // words in the input stream of \n delimited text (eg. generated
> by
> >> 'nc')
> >> 52     *val lines1 = ssc.socketTextStream("localhost", "12345".toInt,
> >> StorageLevel.MEMORY_ONLY_SER)*
> >> * 53     val lines2 = ssc.socketTextStream("localhost", "12345".toInt,
> >> StorageLevel.MEMORY_ONLY_SER)*
> >> * 54     val union2 = lines1.union(lines2)*
> >> 55         //val words = lines.flatMap(_.split(" "))
> >> 56         *val words = union2.flatMap(_.split(" "))*
> >> 57     val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
> >> 58
> >> 59        * words.count().foreachRDD(rdd => {*
> >> * 60     val totalCount = rdd.first()*
> >> * 61 *
> >> * 62     // print to screen*
> >> * 63     println(totalCount)*
> >> * 64 *
> >> * 65     // append count to file*
> >> * 66   //  ...*
> >> * 67 })*
> >>         //wordCounts.print()
> >> 70     ssc.start()
> >> 71     ssc.awaitTermination()
> >> 72   }
> >> 73 }
> >>
> >> What do you think? is My code right?
> >>
> >> I have obtained the follow result:
> >>
> >> root@computer8:/opt/unibs_test/incubator-spark-tdas# bin/run-example
> >> org.apache.spark.streaming.examples.NetworkWordCount
> >> spark://192.168.0.13:7077SLF4J: Class path contains multiple SLF4J
> >> bindings.
> >> SLF4J: Found binding in
> >>
> >>
> [jar:file:/opt/unibs_test/incubator-spark-tdas/examples/target/scala-2.10/spark-examples-assembly-0.9.0-incubating-SNAPSHOT.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> >> SLF4J: Found binding in
> >>
> >>
> [jar:file:/opt/unibs_test/incubator-spark-tdas/assembly/target/scala-2.10/spark-assembly-0.9.0-incubating-SNAPSHOT-hadoop1.0.4.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> >> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
> >> explanation.
> >> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
> >> 14/02/04 00:02:07 INFO StreamingExamples: Using Spark's default log4j
> >> profile: org/apache/spark/log4j-defaults.properties
> >> 14/02/04 00:02:07 INFO StreamingExamples: Setting log level to [WARN]
> for
> >> streaming example. To override add a custom log4j.properties to the
> >> classpath.
> >> 0
> >> 0
> >> 0
> >> 0
> >> 0
> >> 0
> >> 0
> >> 0
> >> 0
> >> 0
> >> 0
> >> 0
> >> 90715
> >> 1375825
> >> 882490
> >> 941226
> >> 811032
> >> 734399
> >> 804453
> >> 718688
> >> 1058695
> >> 854417
> >> 813263
> >> 798885
> >> 785455
> >> 952804
> >> 780140
> >> 697533
> >>
> >>
> >> Thanks Tathagata.
> >>
> >> Att
> >>
> >>
> >> 2014-01-30 Eduardo Costa Alfaia <e....@unibs.it>:
> >>
> >>> Hi Tathagata,
> >>>
> >>> Thank you by your explanations it'll be useful to me to understand how
> >>> work this piece of code to do that we want. We have created a code in C
> >>> which send a txt file, for example Don Quixote, like a stream over the
> >>> network so we've changed the java code from JavaNetworkWordcount to
> >> connect
> >>> in each source described within source code. Bellow it is that we've
> >>> inserted, three streams sources.
> >>>
> >>>      JavaDStream<String> lines1 = ssc1.socketTextStream("localhost",
> >>> Integer.parseInt("12345"));
> >>>      JavaDStream<String> lines2 = ssc1.socketTextStream("localhost",
> >>> Integer.parseInt("12345"));
> >>>      JavaDStream<String> lines3 = ssc1.socketTextStream("localhost",
> >>> Integer.parseInt("12345"));
> >>>      JavaDStream<String> union2 = lines1.union(lines2);
> >>>      JavaDStream<String> union3 = union2.union(lines3);
> >>>      JavaDStream<String> words = union3.flatMap(new
> >>> FlatMapFunction<String, String>() {
> >>>
> >>> So, the second option that you've given me I think to be the better
> >> option.
> >>> Sorry Tathagata for my insistence in this case and I thank you by your
> >>> patient.
> >>>
> >>> Best Regards
> >>>
> >>>
> >>> 2014-01-30 Tathagata Das <ta...@gmail.com>
> >>>
> >>> Let me first ask for a few clarifications.
> >>>>
> >>>> 1. If you just want to count the words in a single text file like Don
> >>>> Quixote (that is, not for a stream of data), you should use only
> Spark.
> >>>> Then the program to count the frequency of words in a text file would
> >> look
> >>>> like this in Java. If you are not super-comfortable with Java, then I
> >>>> strongly recommend using the Scala API or pyspark. For scala, it may
> be
> >> a
> >>>> little trickier to learn if you have absolutely no idea. But it is
> worth
> >>>> it. The frequency count would look like this.
> >>>>
> >>>> val sc = new SparkContext(...)
> >>>> val linesInFile = sc.textFile("path_to_file")
> >>>> val words = linesInFile.flatMap(line => line.split(" "))
> >>>> val frequencies = words.map(word => (word, 1L)).reduceByKey(_ + _)
> >>>> println("Word frequencies = " + frequences.collect())      // collect
> is
> >>>> costly if the file is large
> >>>>
> >>>>
> >>>> 2. Let me assume that you want to do read a stream of text over the
> >>>> network and then print the count of total number of words into a file.
> >> Note
> >>>> that it is "total number of words" and not "frequency of each word".
> The
> >>>> Java version would be something like this.
> >>>>
> >>>> DStream<Integer> totalCounts = words.count();
> >>>>
> >>>> totalCounts.foreachRDD(new Function2<JavaRDD<Long>, Time, Void>() {
> >>>>   @Override public Void call(JavaRDD<Long> pairRDD, Time time) throws
> >>>> Exception {
> >>>>           Long totalCount = totalCounts.first();
> >>>>
> >>>>           // print to screen
> >>>>           System.out.println(totalCount);
> >>>>
> >>>>          // append count to file
> >>>>          ...
> >>>>          return null;
> >>>>    }
> >>>> })
> >>>>
> >>>> This is count how many words have been received in each batch. The
> Scala
> >>>> version would be much simpler to read.
> >>>>
> >>>> words.count().foreachRDD(rdd => {
> >>>>    val totalCount = rdd.first()
> >>>>
> >>>>    // print to screen
> >>>>    println(totalCount)
> >>>>
> >>>>    // append count to file
> >>>>    ...
> >>>> })
> >>>>
> >>>> Hope this helps! I apologize if the code doesnt compile, I didnt test
> >> for
> >>>> syntax and stuff.
> >>>>
> >>>> TD
> >>>>
> >>>>
> >>>>
> >>>> On Thu, Jan 30, 2014 at 8:12 AM, Eduardo Costa Alfaia <
> >>>> e.costaalfaia@unibs.it> wrote:
> >>>>
> >>>>> Hi Guys,
> >>>>>
> >>>>> I'm not very good like java programmer, so anybody could me help with
> >>>>> this
> >>>>> code piece from JavaNetworkWordcount:
> >>>>>
> >>>>> JavaPairDStream<String, Integer> wordCounts = words.map(
> >>>>>        new PairFunction<String, String, Integer>() {
> >>>>>     @Override
> >>>>>          public Tuple2<String, Integer> call(String s) throws
> >> Exception
> >>>>> {
> >>>>>            return new Tuple2<String, Integer>(s, 1);
> >>>>>          }
> >>>>>        }).reduceByKey(new Function2<Integer, Integer, Integer>() {
> >>>>>          @Override
> >>>>>          public Integer call(Integer i1, Integer i2) throws Exception
> >> {
> >>>>>            return i1 + i2;
> >>>>>          }
> >>>>>        });
> >>>>>
> >>>>>      JavaPairDStream<String, Integer> counts =
> >>>>> wordCounts.reduceByKeyAndWindow(
> >>>>>        new Function2<Integer, Integer, Integer>() {
> >>>>>          public Integer call(Integer i1, Integer i2) { return i1 +
> >> i2; }
> >>>>>        },
> >>>>>        new Function2<Integer, Integer, Integer>() {
> >>>>>          public Integer call(Integer i1, Integer i2) { return i1 -
> >> i2; }
> >>>>>        },
> >>>>>        new Duration(60 * 5 * 1000),
> >>>>>        new Duration(1 * 1000)
> >>>>>      );
> >>>>>
> >>>>> I would like to think a manner of counting and after summing  and
> >>>>> getting a
> >>>>> total from words counted in a single file, for example a book in txt
> >>>>> extension Don Quixote. The counts function give me the resulted from
> >> each
> >>>>> word has found and not a total of words from the file.
> >>>>> Tathagata has sent me a piece from scala code, Thanks Tathagata by
> your
> >>>>> attention with my posts I am very thankfully,
> >>>>>
> >>>>>  yourDStream.foreachRDD(rdd => {
> >>>>>
> >>>>>   // Get and print first n elements
> >>>>>   val firstN = rdd.take(n)
> >>>>>   println("First N elements = " + firstN)
> >>>>>
> >>>>>  // Count the number of elements in each batch
> >>>>>  println("RDD has " + rdd.count() + " elements")
> >>>>>
> >>>>> })
> >>>>>
> >>>>> yourDStream.count.print()
> >>>>>
> >>>>> Could anybody help me?
> >>>>>
> >>>>>
> >>>>> Thanks Guys
> >>>>>
> >>>>> --
> >>>>>
> >>>>> INFORMATIVA SUL TRATTAMENTO DEI DATI PERSONALI
> >>>>>
> >>>>> I dati utilizzati per l'invio del presente messaggio sono trattati
> >>>>> dall'Università degli Studi di Brescia esclusivamente per finalità
> >>>>> istituzionali. Informazioni più dettagliate anche in ordine ai
> diritti
> >>>>>
> >>>>> dell'interessato sono riposte nell'informativa generale e nelle
> notizie
> >>>>> pubblicate sul sito web dell'Ateneo nella sezione "Privacy".
> >>>>>
> >>>>> Il contenuto di questo messaggio è rivolto unicamente alle persona
> cui
> >>>>> è indirizzato e può contenere informazioni la cui riservatezza è
> >>>>>
> >>>>> tutelata legalmente. Ne sono vietati la riproduzione, la diffusione e
> >>>>> l'uso
> >>>>> in mancanza di autorizzazione del destinatario. Qualora il messaggio
> >>>>> fosse pervenuto per errore, preghiamo di eliminarlo.
> >>>>>
> >>>>
> >>>>
> >>>
> >>
> >>
> >> --
> >> MSc Eduardo Costa Alfaia
> >> PhD Student
> >> Università degli Studi di Brescia
> >>
> >> --
> >> INFORMATIVA SUL TRATTAMENTO DEI DATI PERSONALI
> >>
> >> I dati utilizzati per l'invio del presente messaggio sono trattati
> >> dall'Università degli Studi di Brescia esclusivamente per finalità
> >> istituzionali. Informazioni più dettagliate anche in ordine ai diritti
> >> dell'interessato sono riposte nell'informativa generale e nelle notizie
> >> pubblicate sul sito web dell'Ateneo nella sezione "Privacy".
> >>
> >> Il contenuto di questo messaggio è rivolto unicamente alle persona cui
> >> è indirizzato e può contenere informazioni la cui riservatezza è
> >> tutelata legalmente. Ne sono vietati la riproduzione, la diffusione e
> l'uso
> >> in mancanza di autorizzazione del destinatario. Qualora il messaggio
> >> fosse pervenuto per errore, preghiamo di eliminarlo.
> >>
>
>
> --
> INFORMATIVA SUL TRATTAMENTO DEI DATI PERSONALI
>
> I dati utilizzati per l'invio del presente messaggio sono trattati
> dall'Università degli Studi di Brescia esclusivamente per finalità
> istituzionali. Informazioni più dettagliate anche in ordine ai diritti
> dell'interessato sono riposte nell'informativa generale e nelle notizie
> pubblicate sul sito web dell'Ateneo nella sezione "Privacy".
>
> Il contenuto di questo messaggio è rivolto unicamente alle persona cui
> è indirizzato e può contenere informazioni la cui riservatezza è
> tutelata legalmente. Ne sono vietati la riproduzione, la diffusione e l'uso
> in mancanza di autorizzazione del destinatario. Qualora il messaggio
> fosse pervenuto per errore, preghiamo di eliminarlo.
>

Re: Source code JavaNetworkWordcount

Posted by Eduardo Costa Alfaia <e....@unibs.it>.
Hi Tathagata
I am playing with NetworkWordCount.scala, I did some changes like this(in red):

 // Create the context with a 1 second batch size
 67     val ssc = new StreamingContext(args(0), "NetworkWordCount", Seconds(1),
 68       System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))
 69         ssc.checkpoint("hdfs://computer8:54310/user/root/INPUT")
 70     // Create a socket text stream on target ip:port and count the
 71     // words in the input stream of \n delimited text (eg. generated by 'nc')
 72     val lines1 = ssc.socketTextStream("localhost", "12345".toInt, StorageLevel.MEMORY_ONLY)
 73     val lines2 = ssc.socketTextStream("localhost", "12345".toInt, StorageLevel.MEMORY_ONLY)
 74     val lines3 = ssc.socketTextStream("localhost", "12345".toInt, StorageLevel.MEMORY_ONLY)
 75     val union2 = lines1.union(lines2)
 76     val union3 = union2.union(lines3)
 77 
 78         //val words = lines.flatMap(_.split(" "))
 79         val words = union3.flatMap(_.split(" "))
 80 //    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
 81         val wordCounts = words.reduceByKeyAndWindow(_ + _, Seconds(30), Seconds(10))

However I have gotten the error bellow:

[error] /opt/unibs_test/incubator-spark-tdas/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala:81: value reduceByKeyAndWindow is not a member of org.apache.spark.streaming.dstream.DStream[String]
[error] 	val wordCounts = words.reduceByKeyAndWindow(_ + _, Seconds(30), Seconds(10))
[error] 	                       ^
[error] one error found
[error] (examples/compile:compile) Compilation failed
[error] Total time: 15 s, completed 05-Feb-2014 17:10:38


The class is import within the code:

import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.storage.StorageLevel


Thanks

On Feb 5, 2014, at 5:22, Tathagata Das <ta...@gmail.com> wrote:

> Seems good to me. BTW, its find to MEMORY_ONLY (i.e. without replication)
> for testing, but you should turn on replication if you want
> fault-tolerance.
> 
> TD
> 
> 
> On Mon, Feb 3, 2014 at 3:19 PM, Eduardo Costa Alfaia <e.costaalfaia@unibs.it
>> wrote:
> 
>> Hi Tathagata,
>> 
>> You were right when you have said for me to use scala against java, scala
>> is very easy. I have implemented that code you have given (in bold), but I
>> have implemented also an union function(in red) because I am testing with 2
>> stream sources, my idea is putting 3 or more stream sources and doing the
>> union.
>> 
>> object NetworkWordCount {
>> 37   def main(args: Array[String]) {
>> 38     if (args.length < 1) {
>> 39       System.err.println("Usage: NetworkWordCount <master> <hostname>
>> <port>\n" +
>> 40         "In local mode, <master> should be 'local[n]' with n > 1")
>> 41       System.exit(1)
>> 42     }
>> 43
>> 44     StreamingExamples.setStreamingLogLevels()
>> 45
>> 46     // Create the context with a 1 second batch size
>> 47     val ssc = new StreamingContext(args(0), "NetworkWordCount",
>> Seconds(1),
>> 48       System.getenv("SPARK_HOME"),
>> StreamingContext.jarOfClass(this.getClass))
>> 49         ssc.checkpoint("hdfs://computer22:54310/user/root/INPUT")
>> 50     // Create a socket text stream on target ip:port and count the
>> 51     // words in the input stream of \n delimited text (eg. generated by
>> 'nc')
>> 52     *val lines1 = ssc.socketTextStream("localhost", "12345".toInt,
>> StorageLevel.MEMORY_ONLY_SER)*
>> * 53     val lines2 = ssc.socketTextStream("localhost", "12345".toInt,
>> StorageLevel.MEMORY_ONLY_SER)*
>> * 54     val union2 = lines1.union(lines2)*
>> 55         //val words = lines.flatMap(_.split(" "))
>> 56         *val words = union2.flatMap(_.split(" "))*
>> 57     val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
>> 58
>> 59        * words.count().foreachRDD(rdd => {*
>> * 60     val totalCount = rdd.first()*
>> * 61 *
>> * 62     // print to screen*
>> * 63     println(totalCount)*
>> * 64 *
>> * 65     // append count to file*
>> * 66   //  ...*
>> * 67 })*
>>         //wordCounts.print()
>> 70     ssc.start()
>> 71     ssc.awaitTermination()
>> 72   }
>> 73 }
>> 
>> What do you think? is My code right?
>> 
>> I have obtained the follow result:
>> 
>> root@computer8:/opt/unibs_test/incubator-spark-tdas# bin/run-example
>> org.apache.spark.streaming.examples.NetworkWordCount
>> spark://192.168.0.13:7077SLF4J: Class path contains multiple SLF4J
>> bindings.
>> SLF4J: Found binding in
>> 
>> [jar:file:/opt/unibs_test/incubator-spark-tdas/examples/target/scala-2.10/spark-examples-assembly-0.9.0-incubating-SNAPSHOT.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>> SLF4J: Found binding in
>> 
>> [jar:file:/opt/unibs_test/incubator-spark-tdas/assembly/target/scala-2.10/spark-assembly-0.9.0-incubating-SNAPSHOT-hadoop1.0.4.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
>> explanation.
>> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
>> 14/02/04 00:02:07 INFO StreamingExamples: Using Spark's default log4j
>> profile: org/apache/spark/log4j-defaults.properties
>> 14/02/04 00:02:07 INFO StreamingExamples: Setting log level to [WARN] for
>> streaming example. To override add a custom log4j.properties to the
>> classpath.
>> 0
>> 0
>> 0
>> 0
>> 0
>> 0
>> 0
>> 0
>> 0
>> 0
>> 0
>> 0
>> 90715
>> 1375825
>> 882490
>> 941226
>> 811032
>> 734399
>> 804453
>> 718688
>> 1058695
>> 854417
>> 813263
>> 798885
>> 785455
>> 952804
>> 780140
>> 697533
>> 
>> 
>> Thanks Tathagata.
>> 
>> Att
>> 
>> 
>> 2014-01-30 Eduardo Costa Alfaia <e....@unibs.it>:
>> 
>>> Hi Tathagata,
>>> 
>>> Thank you by your explanations it'll be useful to me to understand how
>>> work this piece of code to do that we want. We have created a code in C
>>> which send a txt file, for example Don Quixote, like a stream over the
>>> network so we've changed the java code from JavaNetworkWordcount to
>> connect
>>> in each source described within source code. Bellow it is that we've
>>> inserted, three streams sources.
>>> 
>>>      JavaDStream<String> lines1 = ssc1.socketTextStream("localhost",
>>> Integer.parseInt("12345"));
>>>      JavaDStream<String> lines2 = ssc1.socketTextStream("localhost",
>>> Integer.parseInt("12345"));
>>>      JavaDStream<String> lines3 = ssc1.socketTextStream("localhost",
>>> Integer.parseInt("12345"));
>>>      JavaDStream<String> union2 = lines1.union(lines2);
>>>      JavaDStream<String> union3 = union2.union(lines3);
>>>      JavaDStream<String> words = union3.flatMap(new
>>> FlatMapFunction<String, String>() {
>>> 
>>> So, the second option that you've given me I think to be the better
>> option.
>>> Sorry Tathagata for my insistence in this case and I thank you by your
>>> patient.
>>> 
>>> Best Regards
>>> 
>>> 
>>> 2014-01-30 Tathagata Das <ta...@gmail.com>
>>> 
>>> Let me first ask for a few clarifications.
>>>> 
>>>> 1. If you just want to count the words in a single text file like Don
>>>> Quixote (that is, not for a stream of data), you should use only Spark.
>>>> Then the program to count the frequency of words in a text file would
>> look
>>>> like this in Java. If you are not super-comfortable with Java, then I
>>>> strongly recommend using the Scala API or pyspark. For scala, it may be
>> a
>>>> little trickier to learn if you have absolutely no idea. But it is worth
>>>> it. The frequency count would look like this.
>>>> 
>>>> val sc = new SparkContext(...)
>>>> val linesInFile = sc.textFile("path_to_file")
>>>> val words = linesInFile.flatMap(line => line.split(" "))
>>>> val frequencies = words.map(word => (word, 1L)).reduceByKey(_ + _)
>>>> println("Word frequencies = " + frequences.collect())      // collect is
>>>> costly if the file is large
>>>> 
>>>> 
>>>> 2. Let me assume that you want to do read a stream of text over the
>>>> network and then print the count of total number of words into a file.
>> Note
>>>> that it is "total number of words" and not "frequency of each word". The
>>>> Java version would be something like this.
>>>> 
>>>> DStream<Integer> totalCounts = words.count();
>>>> 
>>>> totalCounts.foreachRDD(new Function2<JavaRDD<Long>, Time, Void>() {
>>>>   @Override public Void call(JavaRDD<Long> pairRDD, Time time) throws
>>>> Exception {
>>>>           Long totalCount = totalCounts.first();
>>>> 
>>>>           // print to screen
>>>>           System.out.println(totalCount);
>>>> 
>>>>          // append count to file
>>>>          ...
>>>>          return null;
>>>>    }
>>>> })
>>>> 
>>>> This is count how many words have been received in each batch. The Scala
>>>> version would be much simpler to read.
>>>> 
>>>> words.count().foreachRDD(rdd => {
>>>>    val totalCount = rdd.first()
>>>> 
>>>>    // print to screen
>>>>    println(totalCount)
>>>> 
>>>>    // append count to file
>>>>    ...
>>>> })
>>>> 
>>>> Hope this helps! I apologize if the code doesnt compile, I didnt test
>> for
>>>> syntax and stuff.
>>>> 
>>>> TD
>>>> 
>>>> 
>>>> 
>>>> On Thu, Jan 30, 2014 at 8:12 AM, Eduardo Costa Alfaia <
>>>> e.costaalfaia@unibs.it> wrote:
>>>> 
>>>>> Hi Guys,
>>>>> 
>>>>> I'm not very good like java programmer, so anybody could me help with
>>>>> this
>>>>> code piece from JavaNetworkWordcount:
>>>>> 
>>>>> JavaPairDStream<String, Integer> wordCounts = words.map(
>>>>>        new PairFunction<String, String, Integer>() {
>>>>>     @Override
>>>>>          public Tuple2<String, Integer> call(String s) throws
>> Exception
>>>>> {
>>>>>            return new Tuple2<String, Integer>(s, 1);
>>>>>          }
>>>>>        }).reduceByKey(new Function2<Integer, Integer, Integer>() {
>>>>>          @Override
>>>>>          public Integer call(Integer i1, Integer i2) throws Exception
>> {
>>>>>            return i1 + i2;
>>>>>          }
>>>>>        });
>>>>> 
>>>>>      JavaPairDStream<String, Integer> counts =
>>>>> wordCounts.reduceByKeyAndWindow(
>>>>>        new Function2<Integer, Integer, Integer>() {
>>>>>          public Integer call(Integer i1, Integer i2) { return i1 +
>> i2; }
>>>>>        },
>>>>>        new Function2<Integer, Integer, Integer>() {
>>>>>          public Integer call(Integer i1, Integer i2) { return i1 -
>> i2; }
>>>>>        },
>>>>>        new Duration(60 * 5 * 1000),
>>>>>        new Duration(1 * 1000)
>>>>>      );
>>>>> 
>>>>> I would like to think a manner of counting and after summing  and
>>>>> getting a
>>>>> total from words counted in a single file, for example a book in txt
>>>>> extension Don Quixote. The counts function give me the resulted from
>> each
>>>>> word has found and not a total of words from the file.
>>>>> Tathagata has sent me a piece from scala code, Thanks Tathagata by your
>>>>> attention with my posts I am very thankfully,
>>>>> 
>>>>>  yourDStream.foreachRDD(rdd => {
>>>>> 
>>>>>   // Get and print first n elements
>>>>>   val firstN = rdd.take(n)
>>>>>   println("First N elements = " + firstN)
>>>>> 
>>>>>  // Count the number of elements in each batch
>>>>>  println("RDD has " + rdd.count() + " elements")
>>>>> 
>>>>> })
>>>>> 
>>>>> yourDStream.count.print()
>>>>> 
>>>>> Could anybody help me?
>>>>> 
>>>>> 
>>>>> Thanks Guys
>>>>> 
>>>>> --
>>>>> 
>>>>> INFORMATIVA SUL TRATTAMENTO DEI DATI PERSONALI
>>>>> 
>>>>> I dati utilizzati per l'invio del presente messaggio sono trattati
>>>>> dall'Università degli Studi di Brescia esclusivamente per finalità
>>>>> istituzionali. Informazioni più dettagliate anche in ordine ai diritti
>>>>> 
>>>>> dell'interessato sono riposte nell'informativa generale e nelle notizie
>>>>> pubblicate sul sito web dell'Ateneo nella sezione "Privacy".
>>>>> 
>>>>> Il contenuto di questo messaggio è rivolto unicamente alle persona cui
>>>>> è indirizzato e può contenere informazioni la cui riservatezza è
>>>>> 
>>>>> tutelata legalmente. Ne sono vietati la riproduzione, la diffusione e
>>>>> l'uso
>>>>> in mancanza di autorizzazione del destinatario. Qualora il messaggio
>>>>> fosse pervenuto per errore, preghiamo di eliminarlo.
>>>>> 
>>>> 
>>>> 
>>> 
>> 
>> 
>> --
>> MSc Eduardo Costa Alfaia
>> PhD Student
>> Università degli Studi di Brescia
>> 
>> --
>> INFORMATIVA SUL TRATTAMENTO DEI DATI PERSONALI
>> 
>> I dati utilizzati per l'invio del presente messaggio sono trattati
>> dall'Università degli Studi di Brescia esclusivamente per finalità
>> istituzionali. Informazioni più dettagliate anche in ordine ai diritti
>> dell'interessato sono riposte nell'informativa generale e nelle notizie
>> pubblicate sul sito web dell'Ateneo nella sezione "Privacy".
>> 
>> Il contenuto di questo messaggio è rivolto unicamente alle persona cui
>> è indirizzato e può contenere informazioni la cui riservatezza è
>> tutelata legalmente. Ne sono vietati la riproduzione, la diffusione e l'uso
>> in mancanza di autorizzazione del destinatario. Qualora il messaggio
>> fosse pervenuto per errore, preghiamo di eliminarlo.
>> 


-- 
INFORMATIVA SUL TRATTAMENTO DEI DATI PERSONALI

I dati utilizzati per l'invio del presente messaggio sono trattati 
dall'Università degli Studi di Brescia esclusivamente per finalità 
istituzionali. Informazioni più dettagliate anche in ordine ai diritti 
dell'interessato sono riposte nell'informativa generale e nelle notizie 
pubblicate sul sito web dell'Ateneo nella sezione "Privacy".

Il contenuto di questo messaggio è rivolto unicamente alle persona cui 
è indirizzato e può contenere informazioni la cui riservatezza è 
tutelata legalmente. Ne sono vietati la riproduzione, la diffusione e l'uso 
in mancanza di autorizzazione del destinatario. Qualora il messaggio 
fosse pervenuto per errore, preghiamo di eliminarlo.

Re: Source code JavaNetworkWordcount

Posted by Tathagata Das <ta...@gmail.com>.
Seems good to me. BTW, its find to MEMORY_ONLY (i.e. without replication)
for testing, but you should turn on replication if you want
fault-tolerance.

TD


On Mon, Feb 3, 2014 at 3:19 PM, Eduardo Costa Alfaia <e.costaalfaia@unibs.it
> wrote:

> Hi Tathagata,
>
> You were right when you have said for me to use scala against java, scala
> is very easy. I have implemented that code you have given (in bold), but I
> have implemented also an union function(in red) because I am testing with 2
> stream sources, my idea is putting 3 or more stream sources and doing the
> union.
>
> object NetworkWordCount {
>  37   def main(args: Array[String]) {
>  38     if (args.length < 1) {
>  39       System.err.println("Usage: NetworkWordCount <master> <hostname>
> <port>\n" +
>  40         "In local mode, <master> should be 'local[n]' with n > 1")
>  41       System.exit(1)
>  42     }
>  43
>  44     StreamingExamples.setStreamingLogLevels()
>  45
>  46     // Create the context with a 1 second batch size
>  47     val ssc = new StreamingContext(args(0), "NetworkWordCount",
> Seconds(1),
>  48       System.getenv("SPARK_HOME"),
> StreamingContext.jarOfClass(this.getClass))
>  49         ssc.checkpoint("hdfs://computer22:54310/user/root/INPUT")
>  50     // Create a socket text stream on target ip:port and count the
>  51     // words in the input stream of \n delimited text (eg. generated by
> 'nc')
>  52     *val lines1 = ssc.socketTextStream("localhost", "12345".toInt,
> StorageLevel.MEMORY_ONLY_SER)*
> * 53     val lines2 = ssc.socketTextStream("localhost", "12345".toInt,
> StorageLevel.MEMORY_ONLY_SER)*
> * 54     val union2 = lines1.union(lines2)*
>  55         //val words = lines.flatMap(_.split(" "))
>  56         *val words = union2.flatMap(_.split(" "))*
>  57     val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
>  58
>  59        * words.count().foreachRDD(rdd => {*
> * 60     val totalCount = rdd.first()*
> * 61 *
> * 62     // print to screen*
> * 63     println(totalCount)*
> * 64 *
> * 65     // append count to file*
> * 66   //  ...*
> * 67 })*
>          //wordCounts.print()
>  70     ssc.start()
>  71     ssc.awaitTermination()
>  72   }
>  73 }
>
> What do you think? is My code right?
>
> I have obtained the follow result:
>
> root@computer8:/opt/unibs_test/incubator-spark-tdas# bin/run-example
> org.apache.spark.streaming.examples.NetworkWordCount
> spark://192.168.0.13:7077SLF4J: Class path contains multiple SLF4J
> bindings.
> SLF4J: Found binding in
>
> [jar:file:/opt/unibs_test/incubator-spark-tdas/examples/target/scala-2.10/spark-examples-assembly-0.9.0-incubating-SNAPSHOT.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in
>
> [jar:file:/opt/unibs_test/incubator-spark-tdas/assembly/target/scala-2.10/spark-assembly-0.9.0-incubating-SNAPSHOT-hadoop1.0.4.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
> explanation.
> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
> 14/02/04 00:02:07 INFO StreamingExamples: Using Spark's default log4j
> profile: org/apache/spark/log4j-defaults.properties
> 14/02/04 00:02:07 INFO StreamingExamples: Setting log level to [WARN] for
> streaming example. To override add a custom log4j.properties to the
> classpath.
> 0
> 0
> 0
> 0
> 0
> 0
> 0
> 0
> 0
> 0
> 0
> 0
> 90715
> 1375825
> 882490
> 941226
> 811032
> 734399
> 804453
> 718688
> 1058695
> 854417
> 813263
> 798885
> 785455
> 952804
> 780140
> 697533
>
>
> Thanks Tathagata.
>
> Att
>
>
> 2014-01-30 Eduardo Costa Alfaia <e....@unibs.it>:
>
> > Hi Tathagata,
> >
> > Thank you by your explanations it'll be useful to me to understand how
> > work this piece of code to do that we want. We have created a code in C
> > which send a txt file, for example Don Quixote, like a stream over the
> > network so we've changed the java code from JavaNetworkWordcount to
> connect
> > in each source described within source code. Bellow it is that we've
> > inserted, three streams sources.
> >
> >       JavaDStream<String> lines1 = ssc1.socketTextStream("localhost",
> > Integer.parseInt("12345"));
> >       JavaDStream<String> lines2 = ssc1.socketTextStream("localhost",
> > Integer.parseInt("12345"));
> >       JavaDStream<String> lines3 = ssc1.socketTextStream("localhost",
> > Integer.parseInt("12345"));
> >       JavaDStream<String> union2 = lines1.union(lines2);
> >       JavaDStream<String> union3 = union2.union(lines3);
> >       JavaDStream<String> words = union3.flatMap(new
> > FlatMapFunction<String, String>() {
> >
> > So, the second option that you've given me I think to be the better
> option.
> >  Sorry Tathagata for my insistence in this case and I thank you by your
> > patient.
> >
> > Best Regards
> >
> >
> > 2014-01-30 Tathagata Das <ta...@gmail.com>
> >
> > Let me first ask for a few clarifications.
> >>
> >> 1. If you just want to count the words in a single text file like Don
> >> Quixote (that is, not for a stream of data), you should use only Spark.
> >> Then the program to count the frequency of words in a text file would
> look
> >> like this in Java. If you are not super-comfortable with Java, then I
> >> strongly recommend using the Scala API or pyspark. For scala, it may be
> a
> >> little trickier to learn if you have absolutely no idea. But it is worth
> >> it. The frequency count would look like this.
> >>
> >> val sc = new SparkContext(...)
> >> val linesInFile = sc.textFile("path_to_file")
> >> val words = linesInFile.flatMap(line => line.split(" "))
> >> val frequencies = words.map(word => (word, 1L)).reduceByKey(_ + _)
> >> println("Word frequencies = " + frequences.collect())      // collect is
> >> costly if the file is large
> >>
> >>
> >> 2. Let me assume that you want to do read a stream of text over the
> >> network and then print the count of total number of words into a file.
> Note
> >> that it is "total number of words" and not "frequency of each word". The
> >> Java version would be something like this.
> >>
> >> DStream<Integer> totalCounts = words.count();
> >>
> >> totalCounts.foreachRDD(new Function2<JavaRDD<Long>, Time, Void>() {
> >>    @Override public Void call(JavaRDD<Long> pairRDD, Time time) throws
> >> Exception {
> >>            Long totalCount = totalCounts.first();
> >>
> >>            // print to screen
> >>            System.out.println(totalCount);
> >>
> >>           // append count to file
> >>           ...
> >>           return null;
> >>     }
> >> })
> >>
> >> This is count how many words have been received in each batch. The Scala
> >> version would be much simpler to read.
> >>
> >> words.count().foreachRDD(rdd => {
> >>     val totalCount = rdd.first()
> >>
> >>     // print to screen
> >>     println(totalCount)
> >>
> >>     // append count to file
> >>     ...
> >> })
> >>
> >> Hope this helps! I apologize if the code doesnt compile, I didnt test
> for
> >> syntax and stuff.
> >>
> >> TD
> >>
> >>
> >>
> >> On Thu, Jan 30, 2014 at 8:12 AM, Eduardo Costa Alfaia <
> >> e.costaalfaia@unibs.it> wrote:
> >>
> >>> Hi Guys,
> >>>
> >>> I'm not very good like java programmer, so anybody could me help with
> >>> this
> >>> code piece from JavaNetworkWordcount:
> >>>
> >>> JavaPairDStream<String, Integer> wordCounts = words.map(
> >>>         new PairFunction<String, String, Integer>() {
> >>>      @Override
> >>>           public Tuple2<String, Integer> call(String s) throws
> Exception
> >>> {
> >>>             return new Tuple2<String, Integer>(s, 1);
> >>>           }
> >>>         }).reduceByKey(new Function2<Integer, Integer, Integer>() {
> >>>           @Override
> >>>           public Integer call(Integer i1, Integer i2) throws Exception
> {
> >>>             return i1 + i2;
> >>>           }
> >>>         });
> >>>
> >>>       JavaPairDStream<String, Integer> counts =
> >>> wordCounts.reduceByKeyAndWindow(
> >>>         new Function2<Integer, Integer, Integer>() {
> >>>           public Integer call(Integer i1, Integer i2) { return i1 +
> i2; }
> >>>         },
> >>>         new Function2<Integer, Integer, Integer>() {
> >>>           public Integer call(Integer i1, Integer i2) { return i1 -
> i2; }
> >>>         },
> >>>         new Duration(60 * 5 * 1000),
> >>>         new Duration(1 * 1000)
> >>>       );
> >>>
> >>> I would like to think a manner of counting and after summing  and
> >>> getting a
> >>> total from words counted in a single file, for example a book in txt
> >>> extension Don Quixote. The counts function give me the resulted from
> each
> >>> word has found and not a total of words from the file.
> >>> Tathagata has sent me a piece from scala code, Thanks Tathagata by your
> >>> attention with my posts I am very thankfully,
> >>>
> >>>   yourDStream.foreachRDD(rdd => {
> >>>
> >>>    // Get and print first n elements
> >>>    val firstN = rdd.take(n)
> >>>    println("First N elements = " + firstN)
> >>>
> >>>   // Count the number of elements in each batch
> >>>   println("RDD has " + rdd.count() + " elements")
> >>>
> >>> })
> >>>
> >>> yourDStream.count.print()
> >>>
> >>> Could anybody help me?
> >>>
> >>>
> >>> Thanks Guys
> >>>
> >>> --
> >>>
> >>> INFORMATIVA SUL TRATTAMENTO DEI DATI PERSONALI
> >>>
> >>> I dati utilizzati per l'invio del presente messaggio sono trattati
> >>> dall'Università degli Studi di Brescia esclusivamente per finalità
> >>> istituzionali. Informazioni più dettagliate anche in ordine ai diritti
> >>>
> >>> dell'interessato sono riposte nell'informativa generale e nelle notizie
> >>> pubblicate sul sito web dell'Ateneo nella sezione "Privacy".
> >>>
> >>> Il contenuto di questo messaggio è rivolto unicamente alle persona cui
> >>> è indirizzato e può contenere informazioni la cui riservatezza è
> >>>
> >>> tutelata legalmente. Ne sono vietati la riproduzione, la diffusione e
> >>> l'uso
> >>> in mancanza di autorizzazione del destinatario. Qualora il messaggio
> >>> fosse pervenuto per errore, preghiamo di eliminarlo.
> >>>
> >>
> >>
> >
>
>
> --
> MSc Eduardo Costa Alfaia
> PhD Student
> Università degli Studi di Brescia
>
> --
> INFORMATIVA SUL TRATTAMENTO DEI DATI PERSONALI
>
> I dati utilizzati per l'invio del presente messaggio sono trattati
> dall'Università degli Studi di Brescia esclusivamente per finalità
> istituzionali. Informazioni più dettagliate anche in ordine ai diritti
> dell'interessato sono riposte nell'informativa generale e nelle notizie
> pubblicate sul sito web dell'Ateneo nella sezione "Privacy".
>
> Il contenuto di questo messaggio è rivolto unicamente alle persona cui
> è indirizzato e può contenere informazioni la cui riservatezza è
> tutelata legalmente. Ne sono vietati la riproduzione, la diffusione e l'uso
> in mancanza di autorizzazione del destinatario. Qualora il messaggio
> fosse pervenuto per errore, preghiamo di eliminarlo.
>