You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Patrick Wendell <pw...@gmail.com> on 2014/05/24 03:00:45 UTC

Re: No output from Spark Streaming program with Spark 1.0

Hey Jim,

Do you see the same behavior if you run this outside of eclipse?

Also, what happens if you print something to standard out when setting
up your streams (i.e. not inside of the foreach) do you see that? This
could be a streaming issue, but it could also be something related to
the way it's running in eclipse.

- Patrick

On Fri, May 23, 2014 at 2:57 PM, Jim Donahue <jd...@adobe.com> wrote:
> I¹m trying out 1.0 on a set of small Spark Streaming tests and am running
> into problems.  Here¹s one of the little programs I¹ve used for a long
> time ‹ it reads a Kafka stream that contains Twitter JSON tweets and does
> some simple counting.  The program starts OK (it connects to the Kafka
> stream fine) and generates a stream of INFO logging messages, but never
> generates any output. :-(
>
> I¹m running this in Eclipse, so there may be some class loading issue
> (loading the wrong class or something like that), but I¹m not seeing
> anything in the console output.
>
> Thanks,
>
> Jim Donahue
> Adobe
>
>
>
> val kafka_messages =
>       KafkaUtils.createStream[Array[Byte], Array[Byte],
> kafka.serializer.DefaultDecoder, kafka.serializer.DefaultDecoder](ssc,
> propsMap, topicMap, StorageLevel.MEMORY_AND_DISK)
>
>
>      val messages = kafka_messages.map(_._2)
>
>
>      val total = ssc.sparkContext.accumulator(0)
>
>
>      val startTime = new java.util.Date().getTime()
>
>
>      val jsonstream = messages.map[JSONObject](message =>
>       {val string = new String(message);
>       val json = new JSONObject(string);
>       total += 1
>       json
>       }
>     )
>
>
>     val deleted = ssc.sparkContext.accumulator(0)
>
>
>     val msgstream = jsonstream.filter(json =>
>       if (!json.has("delete")) true else { deleted += 1; false}
>       )
>
>
>     msgstream.foreach(rdd => {
>       if(rdd.count() > 0){
>       val data = rdd.map(json => (json.has("entities"),
> json.length())).collect()
>       val entities: Double = data.count(t => t._1)
>       val fieldCounts = data.sortBy(_._2)
>       val minFields = fieldCounts(0)._2
>       val maxFields = fieldCounts(fieldCounts.size - 1)._2
>       val now = new java.util.Date()
>       val interval = (now.getTime() - startTime) / 1000
>       System.out.println(now.toString)
>       System.out.println("processing time: " + interval + " seconds")
>       System.out.println("total messages: " + total.value)
>       System.out.println("deleted messages: " + deleted.value)
>       System.out.println("message receipt rate: " + (total.value/interval)
> + " per second")
>       System.out.println("messages this interval: " + data.length)
>       System.out.println("message fields varied between: " + minFields + "
> and " + maxFields)
>       System.out.println("fraction with entities is " + (entities /
> data.length))
>       }
>     }
>     )
>
>     ssc.start()
>

Re: No output from Spark Streaming program with Spark 1.0

Posted by Tathagata Das <ta...@gmail.com>.
What does the kafka receiver status on the streaming UI say when you are
connected to the Kafka sources? Does it show any error?

Can you find out which machine the receiver is running and see the worker
logs for any exceptions / error messages? Try turning on the DEBUG level in
log4j.

TD
On May 24, 2014 4:58 PM, "Jim Donahue" <jd...@adobe.com> wrote:

> I looked at the Streaming UI for my job and it reports that it has
> processed many batches, but that none of the batches had any records in
> them. Unfortunately, that’s what I expected.  :-(
>
> I’ve tried multiple test programs and I’m seeing the same thing.  The
> Kafka sources are alive and well and the programs all worked on 0.9 from
> Eclipse.  And there’s no indication of any failure — just no records are
> being delivered.
>
> Any ideas would be much appreciated …
>
>
> Thanks,
>
> Jim
>
>
> On 5/23/14, 7:29 PM, "Tathagata Das" <ta...@gmail.com> wrote:
>
> >Few more suggestions.
> >1. See the web ui, is the system running any jobs? If not, then you may
> >need to give the system more nodes. Basically the system should have more
> >cores than the number of receivers.
> >2. Furthermore there is a streaming specific web ui which gives more
> >streaming specific data.
> >
> >
> >On Fri, May 23, 2014 at 6:02 PM, Patrick Wendell <pw...@gmail.com>
> >wrote:
> >
> >> Also one other thing to try, try removing all of the logic form inside
> >> of foreach and just printing something. It could be that somehow an
> >> exception is being triggered inside of your foreach block and as a
> >> result the output goes away.
> >>
> >> On Fri, May 23, 2014 at 6:00 PM, Patrick Wendell <pw...@gmail.com>
> >> wrote:
> >> > Hey Jim,
> >> >
> >> > Do you see the same behavior if you run this outside of eclipse?
> >> >
> >> > Also, what happens if you print something to standard out when setting
> >> > up your streams (i.e. not inside of the foreach) do you see that? This
> >> > could be a streaming issue, but it could also be something related to
> >> > the way it's running in eclipse.
> >> >
> >> > - Patrick
> >> >
> >> > On Fri, May 23, 2014 at 2:57 PM, Jim Donahue <jd...@adobe.com>
> >>wrote:
> >> >> I¹m trying out 1.0 on a set of small Spark Streaming tests and am
> >> running
> >> >> into problems.  Here¹s one of the little programs I¹ve used for a
> >>long
> >> >> time ‹ it reads a Kafka stream that contains Twitter JSON tweets and
> >> does
> >> >> some simple counting.  The program starts OK (it connects to the
> >>Kafka
> >> >> stream fine) and generates a stream of INFO logging messages, but
> >>never
> >> >> generates any output. :-(
> >> >>
> >> >> I¹m running this in Eclipse, so there may be some class loading issue
> >> >> (loading the wrong class or something like that), but I¹m not seeing
> >> >> anything in the console output.
> >> >>
> >> >> Thanks,
> >> >>
> >> >> Jim Donahue
> >> >> Adobe
> >> >>
> >> >>
> >> >>
> >> >> val kafka_messages =
> >> >>       KafkaUtils.createStream[Array[Byte], Array[Byte],
> >> >> kafka.serializer.DefaultDecoder,
> >>kafka.serializer.DefaultDecoder](ssc,
> >> >> propsMap, topicMap, StorageLevel.MEMORY_AND_DISK)
> >> >>
> >> >>
> >> >>      val messages = kafka_messages.map(_._2)
> >> >>
> >> >>
> >> >>      val total = ssc.sparkContext.accumulator(0)
> >> >>
> >> >>
> >> >>      val startTime = new java.util.Date().getTime()
> >> >>
> >> >>
> >> >>      val jsonstream = messages.map[JSONObject](message =>
> >> >>       {val string = new String(message);
> >> >>       val json = new JSONObject(string);
> >> >>       total += 1
> >> >>       json
> >> >>       }
> >> >>     )
> >> >>
> >> >>
> >> >>     val deleted = ssc.sparkContext.accumulator(0)
> >> >>
> >> >>
> >> >>     val msgstream = jsonstream.filter(json =>
> >> >>       if (!json.has("delete")) true else { deleted += 1; false}
> >> >>       )
> >> >>
> >> >>
> >> >>     msgstream.foreach(rdd => {
> >> >>       if(rdd.count() > 0){
> >> >>       val data = rdd.map(json => (json.has("entities"),
> >> >> json.length())).collect()
> >> >>       val entities: Double = data.count(t => t._1)
> >> >>       val fieldCounts = data.sortBy(_._2)
> >> >>       val minFields = fieldCounts(0)._2
> >> >>       val maxFields = fieldCounts(fieldCounts.size - 1)._2
> >> >>       val now = new java.util.Date()
> >> >>       val interval = (now.getTime() - startTime) / 1000
> >> >>       System.out.println(now.toString)
> >> >>       System.out.println("processing time: " + interval + " seconds")
> >> >>       System.out.println("total messages: " + total.value)
> >> >>       System.out.println("deleted messages: " + deleted.value)
> >> >>       System.out.println("message receipt rate: " +
> >> (total.value/interval)
> >> >> + " per second")
> >> >>       System.out.println("messages this interval: " + data.length)
> >> >>       System.out.println("message fields varied between: " +
> >>minFields
> >> + "
> >> >> and " + maxFields)
> >> >>       System.out.println("fraction with entities is " + (entities /
> >> >> data.length))
> >> >>       }
> >> >>     }
> >> >>     )
> >> >>
> >> >>     ssc.start()
> >> >>
> >>
>
>

Re: No output from Spark Streaming program with Spark 1.0

Posted by Jim Donahue <jd...@adobe.com>.
I looked at the Streaming UI for my job and it reports that it has
processed many batches, but that none of the batches had any records in
them. Unfortunately, that’s what I expected.  :-(

I’ve tried multiple test programs and I’m seeing the same thing.  The
Kafka sources are alive and well and the programs all worked on 0.9 from
Eclipse.  And there’s no indication of any failure — just no records are
being delivered.

Any ideas would be much appreciated …


Thanks,

Jim


On 5/23/14, 7:29 PM, "Tathagata Das" <ta...@gmail.com> wrote:

>Few more suggestions.
>1. See the web ui, is the system running any jobs? If not, then you may
>need to give the system more nodes. Basically the system should have more
>cores than the number of receivers.
>2. Furthermore there is a streaming specific web ui which gives more
>streaming specific data.
>
>
>On Fri, May 23, 2014 at 6:02 PM, Patrick Wendell <pw...@gmail.com>
>wrote:
>
>> Also one other thing to try, try removing all of the logic form inside
>> of foreach and just printing something. It could be that somehow an
>> exception is being triggered inside of your foreach block and as a
>> result the output goes away.
>>
>> On Fri, May 23, 2014 at 6:00 PM, Patrick Wendell <pw...@gmail.com>
>> wrote:
>> > Hey Jim,
>> >
>> > Do you see the same behavior if you run this outside of eclipse?
>> >
>> > Also, what happens if you print something to standard out when setting
>> > up your streams (i.e. not inside of the foreach) do you see that? This
>> > could be a streaming issue, but it could also be something related to
>> > the way it's running in eclipse.
>> >
>> > - Patrick
>> >
>> > On Fri, May 23, 2014 at 2:57 PM, Jim Donahue <jd...@adobe.com>
>>wrote:
>> >> I¹m trying out 1.0 on a set of small Spark Streaming tests and am
>> running
>> >> into problems.  Here¹s one of the little programs I¹ve used for a
>>long
>> >> time ‹ it reads a Kafka stream that contains Twitter JSON tweets and
>> does
>> >> some simple counting.  The program starts OK (it connects to the
>>Kafka
>> >> stream fine) and generates a stream of INFO logging messages, but
>>never
>> >> generates any output. :-(
>> >>
>> >> I¹m running this in Eclipse, so there may be some class loading issue
>> >> (loading the wrong class or something like that), but I¹m not seeing
>> >> anything in the console output.
>> >>
>> >> Thanks,
>> >>
>> >> Jim Donahue
>> >> Adobe
>> >>
>> >>
>> >>
>> >> val kafka_messages =
>> >>       KafkaUtils.createStream[Array[Byte], Array[Byte],
>> >> kafka.serializer.DefaultDecoder,
>>kafka.serializer.DefaultDecoder](ssc,
>> >> propsMap, topicMap, StorageLevel.MEMORY_AND_DISK)
>> >>
>> >>
>> >>      val messages = kafka_messages.map(_._2)
>> >>
>> >>
>> >>      val total = ssc.sparkContext.accumulator(0)
>> >>
>> >>
>> >>      val startTime = new java.util.Date().getTime()
>> >>
>> >>
>> >>      val jsonstream = messages.map[JSONObject](message =>
>> >>       {val string = new String(message);
>> >>       val json = new JSONObject(string);
>> >>       total += 1
>> >>       json
>> >>       }
>> >>     )
>> >>
>> >>
>> >>     val deleted = ssc.sparkContext.accumulator(0)
>> >>
>> >>
>> >>     val msgstream = jsonstream.filter(json =>
>> >>       if (!json.has("delete")) true else { deleted += 1; false}
>> >>       )
>> >>
>> >>
>> >>     msgstream.foreach(rdd => {
>> >>       if(rdd.count() > 0){
>> >>       val data = rdd.map(json => (json.has("entities"),
>> >> json.length())).collect()
>> >>       val entities: Double = data.count(t => t._1)
>> >>       val fieldCounts = data.sortBy(_._2)
>> >>       val minFields = fieldCounts(0)._2
>> >>       val maxFields = fieldCounts(fieldCounts.size - 1)._2
>> >>       val now = new java.util.Date()
>> >>       val interval = (now.getTime() - startTime) / 1000
>> >>       System.out.println(now.toString)
>> >>       System.out.println("processing time: " + interval + " seconds")
>> >>       System.out.println("total messages: " + total.value)
>> >>       System.out.println("deleted messages: " + deleted.value)
>> >>       System.out.println("message receipt rate: " +
>> (total.value/interval)
>> >> + " per second")
>> >>       System.out.println("messages this interval: " + data.length)
>> >>       System.out.println("message fields varied between: " +
>>minFields
>> + "
>> >> and " + maxFields)
>> >>       System.out.println("fraction with entities is " + (entities /
>> >> data.length))
>> >>       }
>> >>     }
>> >>     )
>> >>
>> >>     ssc.start()
>> >>
>>


Re: No output from Spark Streaming program with Spark 1.0

Posted by Tathagata Das <ta...@gmail.com>.
Few more suggestions.
1. See the web ui, is the system running any jobs? If not, then you may
need to give the system more nodes. Basically the system should have more
cores than the number of receivers.
2. Furthermore there is a streaming specific web ui which gives more
streaming specific data.


On Fri, May 23, 2014 at 6:02 PM, Patrick Wendell <pw...@gmail.com> wrote:

> Also one other thing to try, try removing all of the logic form inside
> of foreach and just printing something. It could be that somehow an
> exception is being triggered inside of your foreach block and as a
> result the output goes away.
>
> On Fri, May 23, 2014 at 6:00 PM, Patrick Wendell <pw...@gmail.com>
> wrote:
> > Hey Jim,
> >
> > Do you see the same behavior if you run this outside of eclipse?
> >
> > Also, what happens if you print something to standard out when setting
> > up your streams (i.e. not inside of the foreach) do you see that? This
> > could be a streaming issue, but it could also be something related to
> > the way it's running in eclipse.
> >
> > - Patrick
> >
> > On Fri, May 23, 2014 at 2:57 PM, Jim Donahue <jd...@adobe.com> wrote:
> >> I¹m trying out 1.0 on a set of small Spark Streaming tests and am
> running
> >> into problems.  Here¹s one of the little programs I¹ve used for a long
> >> time ‹ it reads a Kafka stream that contains Twitter JSON tweets and
> does
> >> some simple counting.  The program starts OK (it connects to the Kafka
> >> stream fine) and generates a stream of INFO logging messages, but never
> >> generates any output. :-(
> >>
> >> I¹m running this in Eclipse, so there may be some class loading issue
> >> (loading the wrong class or something like that), but I¹m not seeing
> >> anything in the console output.
> >>
> >> Thanks,
> >>
> >> Jim Donahue
> >> Adobe
> >>
> >>
> >>
> >> val kafka_messages =
> >>       KafkaUtils.createStream[Array[Byte], Array[Byte],
> >> kafka.serializer.DefaultDecoder, kafka.serializer.DefaultDecoder](ssc,
> >> propsMap, topicMap, StorageLevel.MEMORY_AND_DISK)
> >>
> >>
> >>      val messages = kafka_messages.map(_._2)
> >>
> >>
> >>      val total = ssc.sparkContext.accumulator(0)
> >>
> >>
> >>      val startTime = new java.util.Date().getTime()
> >>
> >>
> >>      val jsonstream = messages.map[JSONObject](message =>
> >>       {val string = new String(message);
> >>       val json = new JSONObject(string);
> >>       total += 1
> >>       json
> >>       }
> >>     )
> >>
> >>
> >>     val deleted = ssc.sparkContext.accumulator(0)
> >>
> >>
> >>     val msgstream = jsonstream.filter(json =>
> >>       if (!json.has("delete")) true else { deleted += 1; false}
> >>       )
> >>
> >>
> >>     msgstream.foreach(rdd => {
> >>       if(rdd.count() > 0){
> >>       val data = rdd.map(json => (json.has("entities"),
> >> json.length())).collect()
> >>       val entities: Double = data.count(t => t._1)
> >>       val fieldCounts = data.sortBy(_._2)
> >>       val minFields = fieldCounts(0)._2
> >>       val maxFields = fieldCounts(fieldCounts.size - 1)._2
> >>       val now = new java.util.Date()
> >>       val interval = (now.getTime() - startTime) / 1000
> >>       System.out.println(now.toString)
> >>       System.out.println("processing time: " + interval + " seconds")
> >>       System.out.println("total messages: " + total.value)
> >>       System.out.println("deleted messages: " + deleted.value)
> >>       System.out.println("message receipt rate: " +
> (total.value/interval)
> >> + " per second")
> >>       System.out.println("messages this interval: " + data.length)
> >>       System.out.println("message fields varied between: " + minFields
> + "
> >> and " + maxFields)
> >>       System.out.println("fraction with entities is " + (entities /
> >> data.length))
> >>       }
> >>     }
> >>     )
> >>
> >>     ssc.start()
> >>
>

Re: No output from Spark Streaming program with Spark 1.0

Posted by Patrick Wendell <pw...@gmail.com>.
Also one other thing to try, try removing all of the logic form inside
of foreach and just printing something. It could be that somehow an
exception is being triggered inside of your foreach block and as a
result the output goes away.

On Fri, May 23, 2014 at 6:00 PM, Patrick Wendell <pw...@gmail.com> wrote:
> Hey Jim,
>
> Do you see the same behavior if you run this outside of eclipse?
>
> Also, what happens if you print something to standard out when setting
> up your streams (i.e. not inside of the foreach) do you see that? This
> could be a streaming issue, but it could also be something related to
> the way it's running in eclipse.
>
> - Patrick
>
> On Fri, May 23, 2014 at 2:57 PM, Jim Donahue <jd...@adobe.com> wrote:
>> I¹m trying out 1.0 on a set of small Spark Streaming tests and am running
>> into problems.  Here¹s one of the little programs I¹ve used for a long
>> time ‹ it reads a Kafka stream that contains Twitter JSON tweets and does
>> some simple counting.  The program starts OK (it connects to the Kafka
>> stream fine) and generates a stream of INFO logging messages, but never
>> generates any output. :-(
>>
>> I¹m running this in Eclipse, so there may be some class loading issue
>> (loading the wrong class or something like that), but I¹m not seeing
>> anything in the console output.
>>
>> Thanks,
>>
>> Jim Donahue
>> Adobe
>>
>>
>>
>> val kafka_messages =
>>       KafkaUtils.createStream[Array[Byte], Array[Byte],
>> kafka.serializer.DefaultDecoder, kafka.serializer.DefaultDecoder](ssc,
>> propsMap, topicMap, StorageLevel.MEMORY_AND_DISK)
>>
>>
>>      val messages = kafka_messages.map(_._2)
>>
>>
>>      val total = ssc.sparkContext.accumulator(0)
>>
>>
>>      val startTime = new java.util.Date().getTime()
>>
>>
>>      val jsonstream = messages.map[JSONObject](message =>
>>       {val string = new String(message);
>>       val json = new JSONObject(string);
>>       total += 1
>>       json
>>       }
>>     )
>>
>>
>>     val deleted = ssc.sparkContext.accumulator(0)
>>
>>
>>     val msgstream = jsonstream.filter(json =>
>>       if (!json.has("delete")) true else { deleted += 1; false}
>>       )
>>
>>
>>     msgstream.foreach(rdd => {
>>       if(rdd.count() > 0){
>>       val data = rdd.map(json => (json.has("entities"),
>> json.length())).collect()
>>       val entities: Double = data.count(t => t._1)
>>       val fieldCounts = data.sortBy(_._2)
>>       val minFields = fieldCounts(0)._2
>>       val maxFields = fieldCounts(fieldCounts.size - 1)._2
>>       val now = new java.util.Date()
>>       val interval = (now.getTime() - startTime) / 1000
>>       System.out.println(now.toString)
>>       System.out.println("processing time: " + interval + " seconds")
>>       System.out.println("total messages: " + total.value)
>>       System.out.println("deleted messages: " + deleted.value)
>>       System.out.println("message receipt rate: " + (total.value/interval)
>> + " per second")
>>       System.out.println("messages this interval: " + data.length)
>>       System.out.println("message fields varied between: " + minFields + "
>> and " + maxFields)
>>       System.out.println("fraction with entities is " + (entities /
>> data.length))
>>       }
>>     }
>>     )
>>
>>     ssc.start()
>>