You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by narendra <na...@gmail.com> on 2015/08/04 18:45:34 UTC

No Twitter Input from Kafka to Spark Streaming

My application takes Twitter4j tweets and publishes those to a topic in
Kafka. Spark Streaming subscribes to that topic for processing. But in
actual, Spark Streaming is not able to receive tweet data from Kafka so
Spark Streaming is running empty batch jobs with out input and I am not able
to see any output from Spark Streaming.

The code of the application is - 

import java.util.HashMap
import java.util.Properties
import twitter4j._
import twitter4j.FilterQuery;
import twitter4j.StallWarning;
import twitter4j.Status;
import twitter4j.StatusDeletionNotice;
import twitter4j.StatusListener;
import twitter4j.TwitterStream;
import twitter4j.TwitterStreamFactory;
import twitter4j.conf.ConfigurationBuilder;
import twitter4j.json.DataObjectFactory;
import kafka.serializer.StringDecoder
import org.apache.spark.streaming.kafka._
import kafka.javaapi.producer.Producer
import kafka.producer.{KeyedMessage, ProducerConfig}
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._ 

object TwitterPopularTags {
    def main(args: Array[String]) {

            /** Information necessary for accessing the Twitter API */
        val consumerKey= "2AgtQfH8rlyUDyfjwPOCDosEQ"
        val consumerSecret=
"vnG8uoaan4gPmoy1rFMbz3i19396jODwGRLRqsHBPTwSlMcUIl"
        val accessToken=
"33807905-2vYZMjZyRjFJQrrkPVQwmiQcZCnag6m2wKujpiu4e"
        val accessTokenSecret =
"X880Iq3YseBsAs3e8ZoHSOaDnN431dWJ6QpeMJO6VVAzm"
        val cb = new ConfigurationBuilder()
        cb.setOAuthConsumerKey(consumerKey)
        cb.setOAuthConsumerSecret(consumerSecret)
        cb.setOAuthAccessToken(accessToken)
        cb.setOAuthAccessTokenSecret(accessTokenSecret)
        cb.setJSONStoreEnabled(true)
        cb.setIncludeEntitiesEnabled(true)
        val twitterStream = new
TwitterStreamFactory(cb.build()).getInstance()      

        val KafkaTopic = "LiveTweets"
        /* kafka producer properties */
        val kafkaProducer = {
                        val props = new Properties()
                        props.put("metadata.broker.list",
"broker2:9092,localhost:9092")
                        props.put("serializer.class",
"kafka.serializer.StringEncoder")
                        props.put("request.required.acks", "1")
                        val config = new ProducerConfig(props)
                        new Producer[String, String](config)
                     }

                /* Invoked when a new tweet comes */
        val listener = new StatusListener() { 

                           override def onStatus(status: Status): Unit = {
                               val msg = new KeyedMessage[String,
String](KafkaTopic,DataObjectFactory.getRawJSON(status))
                               kafkaProducer.send(msg)
              }
                   override def onException(ex: Exception): Unit = throw ex

                  // no-op for the following events
                  override def onStallWarning(warning: StallWarning): Unit =
{}
                  override def onDeletionNotice(statusDeletionNotice:
StatusDeletionNotice): Unit = {}
                  override def onScrubGeo(userId: Long, upToStatusId: Long):
Unit = {}
                  override def
onTrackLimitationNotice(numberOfLimitedStatuses: Int): Unit = {}
        }

        twitterStream.addListener(listener)
        // Create Spark Streaming context
        val sparkConf = new SparkConf().setAppName("Twitter-Kafka-Spark
Streaming")
        val sc = new SparkContext(sparkConf)
        val ssc = new StreamingContext(sc, Seconds(2))

        // Define the Kafka parameters, broker list must be specified
        val kafkaParams = Map("metadata.broker.list" ->
"broker2:9092,localhost:9092")
        val topics = Set(KafkaTopic)

        // Create the direct stream with the Kafka parameters and topics
        val kafkaStream = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](ssc,kafkaParams,topics)
        val lines = kafkaStream.map(_._2)
        val words = lines.flatMap(_.split(" "))
        val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
        wordCounts.print()
        ssc.start()
        ssc.awaitTermination()

  }
}

Spark Streaming web UI - 

<http://apache-spark-user-list.1001560.n3.nabble.com/file/n24131/streaming.png> 

<http://apache-spark-user-list.1001560.n3.nabble.com/file/n24131/sparkjobs.png> 


Thank you.



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/No-Twitter-Input-from-Kafka-to-Spark-Streaming-tp24131.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: No Twitter Input from Kafka to Spark Streaming

Posted by Cody Koeninger <co...@koeninger.org>.
Have you tried using the console consumer to see if anything is actually
getting published to that topic?

On Tue, Aug 4, 2015 at 11:45 AM, narendra <na...@gmail.com> wrote:

> My application takes Twitter4j tweets and publishes those to a topic in
> Kafka. Spark Streaming subscribes to that topic for processing. But in
> actual, Spark Streaming is not able to receive tweet data from Kafka so
> Spark Streaming is running empty batch jobs with out input and I am not
> able
> to see any output from Spark Streaming.
>
> The code of the application is -
>
> import java.util.HashMap
> import java.util.Properties
> import twitter4j._
> import twitter4j.FilterQuery;
> import twitter4j.StallWarning;
> import twitter4j.Status;
> import twitter4j.StatusDeletionNotice;
> import twitter4j.StatusListener;
> import twitter4j.TwitterStream;
> import twitter4j.TwitterStreamFactory;
> import twitter4j.conf.ConfigurationBuilder;
> import twitter4j.json.DataObjectFactory;
> import kafka.serializer.StringDecoder
> import org.apache.spark.streaming.kafka._
> import kafka.javaapi.producer.Producer
> import kafka.producer.{KeyedMessage, ProducerConfig}
> import org.apache.spark._
> import org.apache.spark.streaming._
> import org.apache.spark.streaming.StreamingContext._
>
> object TwitterPopularTags {
>     def main(args: Array[String]) {
>
>             /** Information necessary for accessing the Twitter API */
>         val consumerKey= "2AgtQfH8rlyUDyfjwPOCDosEQ"
>         val consumerSecret=
> "vnG8uoaan4gPmoy1rFMbz3i19396jODwGRLRqsHBPTwSlMcUIl"
>         val accessToken=
> "33807905-2vYZMjZyRjFJQrrkPVQwmiQcZCnag6m2wKujpiu4e"
>         val accessTokenSecret =
> "X880Iq3YseBsAs3e8ZoHSOaDnN431dWJ6QpeMJO6VVAzm"
>         val cb = new ConfigurationBuilder()
>         cb.setOAuthConsumerKey(consumerKey)
>         cb.setOAuthConsumerSecret(consumerSecret)
>         cb.setOAuthAccessToken(accessToken)
>         cb.setOAuthAccessTokenSecret(accessTokenSecret)
>         cb.setJSONStoreEnabled(true)
>         cb.setIncludeEntitiesEnabled(true)
>         val twitterStream = new
> TwitterStreamFactory(cb.build()).getInstance()
>
>         val KafkaTopic = "LiveTweets"
>         /* kafka producer properties */
>         val kafkaProducer = {
>                         val props = new Properties()
>                         props.put("metadata.broker.list",
> "broker2:9092,localhost:9092")
>                         props.put("serializer.class",
> "kafka.serializer.StringEncoder")
>                         props.put("request.required.acks", "1")
>                         val config = new ProducerConfig(props)
>                         new Producer[String, String](config)
>                      }
>
>                 /* Invoked when a new tweet comes */
>         val listener = new StatusListener() {
>
>                            override def onStatus(status: Status): Unit = {
>                                val msg = new KeyedMessage[String,
> String](KafkaTopic,DataObjectFactory.getRawJSON(status))
>                                kafkaProducer.send(msg)
>               }
>                    override def onException(ex: Exception): Unit = throw ex
>
>                   // no-op for the following events
>                   override def onStallWarning(warning: StallWarning): Unit
> =
> {}
>                   override def onDeletionNotice(statusDeletionNotice:
> StatusDeletionNotice): Unit = {}
>                   override def onScrubGeo(userId: Long, upToStatusId:
> Long):
> Unit = {}
>                   override def
> onTrackLimitationNotice(numberOfLimitedStatuses: Int): Unit = {}
>         }
>
>         twitterStream.addListener(listener)
>         // Create Spark Streaming context
>         val sparkConf = new SparkConf().setAppName("Twitter-Kafka-Spark
> Streaming")
>         val sc = new SparkContext(sparkConf)
>         val ssc = new StreamingContext(sc, Seconds(2))
>
>         // Define the Kafka parameters, broker list must be specified
>         val kafkaParams = Map("metadata.broker.list" ->
> "broker2:9092,localhost:9092")
>         val topics = Set(KafkaTopic)
>
>         // Create the direct stream with the Kafka parameters and topics
>         val kafkaStream = KafkaUtils.createDirectStream[String, String,
> StringDecoder, StringDecoder](ssc,kafkaParams,topics)
>         val lines = kafkaStream.map(_._2)
>         val words = lines.flatMap(_.split(" "))
>         val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
>         wordCounts.print()
>         ssc.start()
>         ssc.awaitTermination()
>
>   }
> }
>
> Spark Streaming web UI -
>
> <
> http://apache-spark-user-list.1001560.n3.nabble.com/file/n24131/streaming.png
> >
>
> <
> http://apache-spark-user-list.1001560.n3.nabble.com/file/n24131/sparkjobs.png
> >
>
>
> Thank you.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/No-Twitter-Input-from-Kafka-to-Spark-Streaming-tp24131.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>

Re: No Twitter Input from Kafka to Spark Streaming

Posted by Akhil Das <ak...@sigmoidanalytics.com>.
You just pasted your twitter credentials, consider changing it. :/

Thanks
Best Regards

On Wed, Aug 5, 2015 at 10:07 PM, narendra <na...@gmail.com> wrote:

> Thanks Akash for the answer. I added endpoint to the listener and now it is
> working.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/No-Twitter-Input-from-Kafka-to-Spark-Streaming-tp24131p24142.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>

Re: No Twitter Input from Kafka to Spark Streaming

Posted by narendra <na...@gmail.com>.
Thanks Akash for the answer. I added endpoint to the listener and now it is
working.



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/No-Twitter-Input-from-Kafka-to-Spark-Streaming-tp24131p24142.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org