You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by SK <sk...@gmail.com> on 2014/07/11 23:57:37 UTC

Streaming training@ Spark Summit 2014

Hi,

I tried out the streaming program on the Spark training web page. I created
a Twitter app as per the instructions (pointing to http://www.twitter.com).
When I run the program, my credentials get printed out correctly but
thereafter, my program just keeps waiting. It does not print out the hashtag
count etc.  My code appears below (essentially same as what is on the
training web page). I would like to know why I am not able to get a
continuous stream and the hashtag count.

thanks

   // relevant code snippet 

   
TutorialHelper.configureTwitterCredentials(apiKey,apiSecret,accessToken,accessTokenSecret)

     val ssc = new StreamingContext(new SparkConf(), Seconds(1))
     val tweets = TwitterUtils.createStream(ssc, None)
     val statuses = tweets.map(status => status.getText())
     statuses.print()

     ssc.checkpoint(checkpointDir)

     val words = statuses.flatMap(status => status.split(" "))
     val hashtags = words.filter(word => word.startsWith("#"))
     hashtags.print()

     val counts = hashtags.map(tag => (tag, 1))
                          .reduceByKeyAndWindow(_ + _, _ - _, Seconds(60 *
5), Seconds(1))
     counts.print()

     val sortedCounts = counts.map { case(tag, count) => (count, tag) }
                         .transform(rdd => rdd.sortByKey(false))
     sortedCounts.foreach(rdd =>
                 println("\nTop 10 hashtags:\n" +
rdd.take(10).mkString("\n")))

     ssc.start()
     ssc.awaitTermination()

//end code snippet 




--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-training-Spark-Summit-2014-tp9465.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Streaming training@ Spark Summit 2014

Posted by Tathagata Das <ta...@gmail.com>.
Does nothing get printed on the screen? If you are not getting any tweets
but spark streaming is running successfully you should get at least counts
being printed every batch (which would be zero). But they are not being
printed either, check the spark web ui to see stages are running or not. If
they are not, you may not have enough cores to run.

TD


On Fri, Jul 11, 2014 at 7:09 PM, Soumya Simanta <so...@gmail.com>
wrote:

> Try running a simple standalone program if you are using Scala and see if
> you are getting any data. I use this to debug any connection/twitter4j
> issues.
>
>
> import twitter4j._
>
>
> //put your keys and creds here
> object Util {
>   val config = new twitter4j.conf.ConfigurationBuilder()
>     .setOAuthConsumerKey("")
>     .setOAuthConsumerSecret("")
>     .setOAuthAccessToken("")
>     .setOAuthAccessTokenSecret("")
>     .build
> }
>
>
> /**
>  *   Add this to your build.sbt
>  *   "org.twitter4j" % "twitter4j-stream" % "3.0.3",
>
>  */
> object SimpleStreamer extends App {
>
>
>   def simpleStatusListener = new StatusListener() {
>     def onStatus(status: Status) {
>       println(status.getUserMentionEntities.length)
>     }
>
>     def onDeletionNotice(statusDeletionNotice: StatusDeletionNotice) {}
>
>     def onTrackLimitationNotice(numberOfLimitedStatuses: Int) {}
>
>     def onException(ex: Exception) {
>       ex.printStackTrace
>     }
>
>     def onScrubGeo(arg0: Long, arg1: Long) {}
>
>     def onStallWarning(warning: StallWarning) {}
>   }
>
>   val keywords = List("dog", "cat")
>   val twitterStream = new TwitterStreamFactory(Util.config).getInstance
>   twitterStream.addListener(simpleStatusListener)
>   twitterStream.filter(new FilterQuery().track(keywords.toArray))
>
> }
>
>
>
> On Fri, Jul 11, 2014 at 7:19 PM, SK <sk...@gmail.com> wrote:
>
>> I dont have a proxy server.
>>
>> thanks.
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-training-Spark-Summit-2014-tp9465p9481.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>
>

Re: Streaming training@ Spark Summit 2014

Posted by Soumya Simanta <so...@gmail.com>.
Try running a simple standalone program if you are using Scala and see if
you are getting any data. I use this to debug any connection/twitter4j
issues.


import twitter4j._


//put your keys and creds here
object Util {
  val config = new twitter4j.conf.ConfigurationBuilder()
    .setOAuthConsumerKey("")
    .setOAuthConsumerSecret("")
    .setOAuthAccessToken("")
    .setOAuthAccessTokenSecret("")
    .build
}


/**
 *   Add this to your build.sbt
 *   "org.twitter4j" % "twitter4j-stream" % "3.0.3",

 */
object SimpleStreamer extends App {


  def simpleStatusListener = new StatusListener() {
    def onStatus(status: Status) {
      println(status.getUserMentionEntities.length)
    }

    def onDeletionNotice(statusDeletionNotice: StatusDeletionNotice) {}

    def onTrackLimitationNotice(numberOfLimitedStatuses: Int) {}

    def onException(ex: Exception) {
      ex.printStackTrace
    }

    def onScrubGeo(arg0: Long, arg1: Long) {}

    def onStallWarning(warning: StallWarning) {}
  }

  val keywords = List("dog", "cat")
  val twitterStream = new TwitterStreamFactory(Util.config).getInstance
  twitterStream.addListener(simpleStatusListener)
  twitterStream.filter(new FilterQuery().track(keywords.toArray))

}



On Fri, Jul 11, 2014 at 7:19 PM, SK <sk...@gmail.com> wrote:

> I dont have a proxy server.
>
> thanks.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-training-Spark-Summit-2014-tp9465p9481.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>

Re: Streaming training@ Spark Summit 2014

Posted by SK <sk...@gmail.com>.
I dont have a proxy server.

thanks.



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-training-Spark-Summit-2014-tp9465p9481.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Streaming training@ Spark Summit 2014

Posted by Soumya Simanta <so...@gmail.com>.
Do you have a proxy server ? 

If yes you need to set the proxy for twitter4j 

> On Jul 11, 2014, at 7:06 PM, SK <sk...@gmail.com> wrote:
> 
> I dont get any exceptions or error messages.
> 
> I tried it both with and without VPN and had the same outcome. But  I can
> try again without VPN later today and report back.
> 
> thanks.
> 
> 
> 
> --
> View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-training-Spark-Summit-2014-tp9465p9477.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Streaming training@ Spark Summit 2014

Posted by SK <sk...@gmail.com>.
I dont get any exceptions or error messages.

I tried it both with and without VPN and had the same outcome. But  I can
try again without VPN later today and report back.

thanks.



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-training-Spark-Summit-2014-tp9465p9477.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Streaming training@ Spark Summit 2014

Posted by Tathagata Das <ta...@gmail.com>.
You dont get any exception from twitter.com, saying credential error or
something?

I have seen this happen when once one was behind vpn to his office, and
probably twitter was blocked in their office.
You could be having a similar issue.

TD


On Fri, Jul 11, 2014 at 2:57 PM, SK <sk...@gmail.com> wrote:

> Hi,
>
> I tried out the streaming program on the Spark training web page. I created
> a Twitter app as per the instructions (pointing to http://www.twitter.com
> ).
> When I run the program, my credentials get printed out correctly but
> thereafter, my program just keeps waiting. It does not print out the
> hashtag
> count etc.  My code appears below (essentially same as what is on the
> training web page). I would like to know why I am not able to get a
> continuous stream and the hashtag count.
>
> thanks
>
>    // relevant code snippet
>
>
>
> TutorialHelper.configureTwitterCredentials(apiKey,apiSecret,accessToken,accessTokenSecret)
>
>      val ssc = new StreamingContext(new SparkConf(), Seconds(1))
>      val tweets = TwitterUtils.createStream(ssc, None)
>      val statuses = tweets.map(status => status.getText())
>      statuses.print()
>
>      ssc.checkpoint(checkpointDir)
>
>      val words = statuses.flatMap(status => status.split(" "))
>      val hashtags = words.filter(word => word.startsWith("#"))
>      hashtags.print()
>
>      val counts = hashtags.map(tag => (tag, 1))
>                           .reduceByKeyAndWindow(_ + _, _ - _, Seconds(60 *
> 5), Seconds(1))
>      counts.print()
>
>      val sortedCounts = counts.map { case(tag, count) => (count, tag) }
>                          .transform(rdd => rdd.sortByKey(false))
>      sortedCounts.foreach(rdd =>
>                  println("\nTop 10 hashtags:\n" +
> rdd.take(10).mkString("\n")))
>
>      ssc.start()
>      ssc.awaitTermination()
>
> //end code snippet
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-training-Spark-Summit-2014-tp9465.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>