You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Cui Lin <cu...@hds.com> on 2015/03/05 06:43:15 UTC

How to parse Json formatted Kafka message in spark streaming

Friends,

I'm trying to parse json formatted Kafka messages and then send back to cassandra.I have two problems:

  1.  I got the exception below. How to check an empty RDD?

Exception in thread "main" java.lang.UnsupportedOperationException: empty collection
at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869)
at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.reduce(RDD.scala:869)
at org.apache.spark.sql.json.JsonRDD$.inferSchema(JsonRDD.scala:57)
at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:232)
at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:204)


val messages = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](…)

messages.foreachRDD { rdd =>
  val message:RDD[String] = rdd.map { y => y._2 }
  sqlContext.jsonRDD(message).registerTempTable("tempTable")
  sqlContext.sql("SELECT time,To FROM tempTable")
    .saveToCassandra(cassandra_keyspace, cassandra_table, SomeColumns("key", "msg"))
}

2. how to get all column names from json messages? I have hundreds of columns in the json formatted message.

Thanks for your help!




Best regards,

Cui Lin

Re: How to parse Json formatted Kafka message in spark streaming

Posted by Cui Lin <cu...@hds.com>.
Hi, Helena,

I think your new version only fits to the json that has very limited columns. I couldn’t find MonthlyCommits, but I assume it only has small number of columns that are defined manually.
In my case, I have hundreds of column names so it is not feasible to define any class for these columns.

Is there any way to get column name instead of hard code “time” in this case?
 mapper.readValue[Map[String,Any]](x).get("time")

Best regards,

Cui Lin

From: Helena Edelson <he...@datastax.com>>
Date: Thursday, March 5, 2015 at 7:02 AM
To: Ted Yu <yu...@gmail.com>>
Cc: Akhil Das <ak...@sigmoidanalytics.com>>, Cui Lin <Cu...@hds.com>>, "user@spark.apache.org<ma...@spark.apache.org>" <us...@spark.apache.org>>
Subject: Re: How to parse Json formatted Kafka message in spark streaming

Great point :) Cui, Here’s a cleaner way than I had before, w/out the use of spark sql for the mapping:


KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
  ssc, kafka.kafkaParams, Map("github" -> 5), StorageLevel.MEMORY_ONLY)
  .map{ case (k,v) => JsonParser.parse(v).extract[MonthlyCommits]}
  .saveToCassandra("githubstats","monthly_commits")

[datastax_logo.png]<http://www.datastax.com/>
HELENA EDELSON
Senior Software Engineer,  DSE Analytics

[linkedin.png]<https://www.linkedin.com/in/helenaedelson>[twitter.png]<https://twitter.com/helenaedelson>[https://lh3.googleusercontent.com/osrzRgrOxm-gW72LtTXbYGuQkFiBqViXEQBVw4v_cbl99iphx_LETFoz0Ew_bYfYSqIg53gwho5elasykBtuKj1we5KqatfDbvYYw3vnupBmLrs0kkL0t4l9u8JDQqzwLw]<https://github.com/helena>

On Mar 5, 2015, at 9:33 AM, Ted Yu <yu...@gmail.com>> wrote:

Cui:
You can check messages.partitions.size to determine whether messages is an empty RDD.

Cheers

On Thu, Mar 5, 2015 at 12:52 AM, Akhil Das <ak...@sigmoidanalytics.com>> wrote:
When you use KafkaUtils.createStream with StringDecoders, it will return String objects inside your messages stream. To access the elements from the json, you could do something like the following:


   val mapStream = messages.map(x=> {
      val mapper = new ObjectMapper() with ScalaObjectMapper
      mapper.registerModule(DefaultScalaModule)

      mapper.readValue[Map[String,Any]](x).get("time")
    })



Thanks
Best Regards

On Thu, Mar 5, 2015 at 11:13 AM, Cui Lin <cu...@hds.com>> wrote:
Friends,

I'm trying to parse json formatted Kafka messages and then send back to cassandra.I have two problems:

  1.  I got the exception below. How to check an empty RDD?

Exception in thread "main" java.lang.UnsupportedOperationException: empty collection
at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869)
at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.reduce(RDD.scala:869)
at org.apache.spark.sql.json.JsonRDD$.inferSchema(JsonRDD.scala:57)
at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:232)
at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:204)


val messages = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](…)

messages.foreachRDD { rdd =>
  val message:RDD[String] = rdd.map { y => y._2 }
  sqlContext.jsonRDD(message).registerTempTable("tempTable")
  sqlContext.sql("SELECT time,To FROM tempTable")
    .saveToCassandra(cassandra_keyspace, cassandra_table, SomeColumns("key", "msg"))
}

2. how to get all column names from json messages? I have hundreds of columns in the json formatted message.

Thanks for your help!




Best regards,

Cui Lin




Re: How to parse Json formatted Kafka message in spark streaming

Posted by Helena Edelson <he...@datastax.com>.
Great point :) Cui, Here’s a cleaner way than I had before, w/out the use of spark sql for the mapping:

KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
  ssc, kafka.kafkaParams, Map("github" -> 5), StorageLevel.MEMORY_ONLY)
  .map{ case (k,v) => JsonParser.parse(v).extract[MonthlyCommits]}
  .saveToCassandra("githubstats","monthly_commits")


HELENA EDELSON
Senior Software Engineer,  DSE Analytics 

  

On Mar 5, 2015, at 9:33 AM, Ted Yu <yu...@gmail.com> wrote:

> Cui:
> You can check messages.partitions.size to determine whether messages is an empty RDD.
> 
> Cheers
> 
> On Thu, Mar 5, 2015 at 12:52 AM, Akhil Das <ak...@sigmoidanalytics.com> wrote:
> When you use KafkaUtils.createStream with StringDecoders, it will return String objects inside your messages stream. To access the elements from the json, you could do something like the following:
> 
> 
>    val mapStream = messages.map(x=> {
>       val mapper = new ObjectMapper() with ScalaObjectMapper
>       mapper.registerModule(DefaultScalaModule)
> 
>       mapper.readValue[Map[String,Any]](x).get("time")
>     })
> 
>   
> 
> Thanks
> Best Regards
> 
> On Thu, Mar 5, 2015 at 11:13 AM, Cui Lin <cu...@hds.com> wrote:
> Friends,
> 
> I'm trying to parse json formatted Kafka messages and then send back to cassandra.I have two problems:
> I got the exception below. How to check an empty RDD?
> Exception in thread "main" java.lang.UnsupportedOperationException: empty collection
> at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869)
> at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869)
> at scala.Option.getOrElse(Option.scala:120)
> at org.apache.spark.rdd.RDD.reduce(RDD.scala:869)
> at org.apache.spark.sql.json.JsonRDD$.inferSchema(JsonRDD.scala:57)
> at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:232)
> at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:204)
> 
> val messages = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](…)
> messages.foreachRDD { rdd =>
>   val message:RDD[String] = rdd.map { y => y._2 }
>   sqlContext.jsonRDD(message).registerTempTable("tempTable")
>   sqlContext.sql("SELECT time,To FROM tempTable")
>     .saveToCassandra(cassandra_keyspace, cassandra_table, SomeColumns("key", "msg"))
> }
> 
> 2. how to get all column names from json messages? I have hundreds of columns in the json formatted message. 
> 
> Thanks for your help!
> 
> 
> 
> 
> Best regards,
> 
> Cui Lin
> 
> 


Re: How to parse Json formatted Kafka message in spark streaming

Posted by Ted Yu <yu...@gmail.com>.
See following thread for 1.3.0 release:
http://search-hadoop.com/m/JW1q5hV8c4

Looks like the release is around the corner.

On Thu, Mar 5, 2015 at 3:26 PM, Cui Lin <cu...@hds.com> wrote:

>   Hi, Ted,
>
>  Thanks for your reply. I noticed from the below link partitions.size
> will not work for checking empty RDD in streams. It seems that the problem
> can be solved in spark 1.3 which is no way to download at this time?
>
>  https://issues.apache.org/jira/browse/SPARK-5270
>  Best regards,
>
>  Cui Lin
>
>   From: Ted Yu <yu...@gmail.com>
> Date: Thursday, March 5, 2015 at 6:33 AM
> To: Akhil Das <ak...@sigmoidanalytics.com>
> Cc: Cui Lin <Cu...@hds.com>, "user@spark.apache.org" <
> user@spark.apache.org>
> Subject: Re: How to parse Json formatted Kafka message in spark streaming
>
>   Cui:
> You can check messages.partitions.size to determine whether messages is
> an empty RDD.
>
>  Cheers
>
> On Thu, Mar 5, 2015 at 12:52 AM, Akhil Das <ak...@sigmoidanalytics.com>
> wrote:
>
>>  When you use KafkaUtils.createStream with StringDecoders, it will
>> return String objects inside your messages stream. To access the elements
>> from the json, you could do something like the following:
>>
>>
>>     val mapStream = messages.map(x=> {
>>        val mapper = new ObjectMapper() with ScalaObjectMapper
>>       mapper.registerModule(DefaultScalaModule)
>>
>>        mapper.readValue[Map[String,Any]](x)*.get("time")*
>>     })
>>
>>
>>
>>  Thanks
>> Best Regards
>>
>> On Thu, Mar 5, 2015 at 11:13 AM, Cui Lin <cu...@hds.com> wrote:
>>
>>>   Friends,
>>>
>>>   I'm trying to parse json formatted Kafka messages and then send back
>>> to cassandra.I have two problems:
>>>
>>>    1. I got the exception below. How to check an empty RDD?
>>>
>>>  Exception in thread "main" java.lang.UnsupportedOperationException:
>>> empty collection
>>>  at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869)
>>>  at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869)
>>>  at scala.Option.getOrElse(Option.scala:120)
>>>  at org.apache.spark.rdd.RDD.reduce(RDD.scala:869)
>>>  at org.apache.spark.sql.json.JsonRDD$.inferSchema(JsonRDD.scala:57)
>>>  at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:232)
>>>  at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:204)
>>>
>>>  val messages = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](…)
>>>
>>> messages.foreachRDD { rdd =>
>>>   val message:RDD[String] = rdd.map { y => y._2 }
>>>   sqlContext.jsonRDD(message).registerTempTable("tempTable")
>>>   sqlContext.sql("SELECT time,To FROM tempTable")
>>>     .saveToCassandra(cassandra_keyspace, cassandra_table, SomeColumns("key", "msg"))
>>> }
>>>
>>>
>>>  2. how to get all column names from json messages? I have hundreds of
>>> columns in the json formatted message.
>>>
>>>  Thanks for your help!
>>>
>>>
>>>
>>>
>>>  Best regards,
>>>
>>>  Cui Lin
>>>
>>
>>
>

Re: How to parse Json formatted Kafka message in spark streaming

Posted by Cui Lin <cu...@hds.com>.
Hi, Ted,

Thanks for your reply. I noticed from the below link partitions.size will not work for checking empty RDD in streams. It seems that the problem can be solved in spark 1.3 which is no way to download at this time?

https://issues.apache.org/jira/browse/SPARK-5270
Best regards,

Cui Lin

From: Ted Yu <yu...@gmail.com>>
Date: Thursday, March 5, 2015 at 6:33 AM
To: Akhil Das <ak...@sigmoidanalytics.com>>
Cc: Cui Lin <Cu...@hds.com>>, "user@spark.apache.org<ma...@spark.apache.org>" <us...@spark.apache.org>>
Subject: Re: How to parse Json formatted Kafka message in spark streaming

Cui:
You can check messages.partitions.size to determine whether messages is an empty RDD.

Cheers

On Thu, Mar 5, 2015 at 12:52 AM, Akhil Das <ak...@sigmoidanalytics.com>> wrote:
When you use KafkaUtils.createStream with StringDecoders, it will return String objects inside your messages stream. To access the elements from the json, you could do something like the following:


   val mapStream = messages.map(x=> {
      val mapper = new ObjectMapper() with ScalaObjectMapper
      mapper.registerModule(DefaultScalaModule)

      mapper.readValue[Map[String,Any]](x).get("time")
    })



Thanks
Best Regards

On Thu, Mar 5, 2015 at 11:13 AM, Cui Lin <cu...@hds.com>> wrote:
Friends,

I'm trying to parse json formatted Kafka messages and then send back to cassandra.I have two problems:

  1.  I got the exception below. How to check an empty RDD?

Exception in thread "main" java.lang.UnsupportedOperationException: empty collection
at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869)
at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.reduce(RDD.scala:869)
at org.apache.spark.sql.json.JsonRDD$.inferSchema(JsonRDD.scala:57)
at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:232)
at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:204)


val messages = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](…)

messages.foreachRDD { rdd =>
  val message:RDD[String] = rdd.map { y => y._2 }
  sqlContext.jsonRDD(message).registerTempTable("tempTable")
  sqlContext.sql("SELECT time,To FROM tempTable")
    .saveToCassandra(cassandra_keyspace, cassandra_table, SomeColumns("key", "msg"))
}

2. how to get all column names from json messages? I have hundreds of columns in the json formatted message.

Thanks for your help!




Best regards,

Cui Lin



Re: How to parse Json formatted Kafka message in spark streaming

Posted by Ted Yu <yu...@gmail.com>.
Cui:
You can check messages.partitions.size to determine whether messages is an
empty RDD.

Cheers

On Thu, Mar 5, 2015 at 12:52 AM, Akhil Das <ak...@sigmoidanalytics.com>
wrote:

> When you use KafkaUtils.createStream with StringDecoders, it will return
> String objects inside your messages stream. To access the elements from the
> json, you could do something like the following:
>
>
>    val mapStream = messages.map(x=> {
>       val mapper = new ObjectMapper() with ScalaObjectMapper
>       mapper.registerModule(DefaultScalaModule)
>
>       mapper.readValue[Map[String,Any]](x)*.get("time")*
>     })
>
>
>
> Thanks
> Best Regards
>
> On Thu, Mar 5, 2015 at 11:13 AM, Cui Lin <cu...@hds.com> wrote:
>
>>   Friends,
>>
>>   I'm trying to parse json formatted Kafka messages and then send back
>> to cassandra.I have two problems:
>>
>>    1. I got the exception below. How to check an empty RDD?
>>
>>  Exception in thread "main" java.lang.UnsupportedOperationException:
>> empty collection
>>  at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869)
>>  at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869)
>>  at scala.Option.getOrElse(Option.scala:120)
>>  at org.apache.spark.rdd.RDD.reduce(RDD.scala:869)
>>  at org.apache.spark.sql.json.JsonRDD$.inferSchema(JsonRDD.scala:57)
>>  at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:232)
>>  at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:204)
>>
>>  val messages = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](…)
>>
>> messages.foreachRDD { rdd =>
>>   val message:RDD[String] = rdd.map { y => y._2 }
>>   sqlContext.jsonRDD(message).registerTempTable("tempTable")
>>   sqlContext.sql("SELECT time,To FROM tempTable")
>>     .saveToCassandra(cassandra_keyspace, cassandra_table, SomeColumns("key", "msg"))
>> }
>>
>>
>>  2. how to get all column names from json messages? I have hundreds of
>> columns in the json formatted message.
>>
>>  Thanks for your help!
>>
>>
>>
>>
>>  Best regards,
>>
>>  Cui Lin
>>
>
>

Re: How to parse Json formatted Kafka message in spark streaming

Posted by Akhil Das <ak...@sigmoidanalytics.com>.
When you use KafkaUtils.createStream with StringDecoders, it will return
String objects inside your messages stream. To access the elements from the
json, you could do something like the following:


   val mapStream = messages.map(x=> {
      val mapper = new ObjectMapper() with ScalaObjectMapper
      mapper.registerModule(DefaultScalaModule)

      mapper.readValue[Map[String,Any]](x)*.get("time")*
    })



Thanks
Best Regards

On Thu, Mar 5, 2015 at 11:13 AM, Cui Lin <cu...@hds.com> wrote:

>   Friends,
>
>   I'm trying to parse json formatted Kafka messages and then send back to
> cassandra.I have two problems:
>
>    1. I got the exception below. How to check an empty RDD?
>
>  Exception in thread "main" java.lang.UnsupportedOperationException:
> empty collection
>  at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869)
>  at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869)
>  at scala.Option.getOrElse(Option.scala:120)
>  at org.apache.spark.rdd.RDD.reduce(RDD.scala:869)
>  at org.apache.spark.sql.json.JsonRDD$.inferSchema(JsonRDD.scala:57)
>  at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:232)
>  at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:204)
>
>  val messages = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](…)
>
> messages.foreachRDD { rdd =>
>   val message:RDD[String] = rdd.map { y => y._2 }
>   sqlContext.jsonRDD(message).registerTempTable("tempTable")
>   sqlContext.sql("SELECT time,To FROM tempTable")
>     .saveToCassandra(cassandra_keyspace, cassandra_table, SomeColumns("key", "msg"))
> }
>
>
>  2. how to get all column names from json messages? I have hundreds of
> columns in the json formatted message.
>
>  Thanks for your help!
>
>
>
>
>  Best regards,
>
>  Cui Lin
>

Re: How to parse Json formatted Kafka message in spark streaming

Posted by Helena Edelson <he...@datastax.com>.
Hi Cui,

What version of Spark are you using? There was a bug ticket that may be related to this, fixed in core/src/main/scala/org/apache/spark/rdd/RDD.scala that is merged into versions 1.3.0 and 1.2.1 . If you are using 1.1.1 that may be the reason but it’s a stretch https://issues.apache.org/jira/browse/SPARK-4968

Did you verify that you have data streaming from Kafka?

Helena
https://twitter.com/helenaedelson

On Mar 5, 2015, at 12:43 AM, Cui Lin <cu...@hds.com> wrote:

> Friends,
> 
> I'm trying to parse json formatted Kafka messages and then send back to cassandra.I have two problems:
> I got the exception below. How to check an empty RDD?
> Exception in thread "main" java.lang.UnsupportedOperationException: empty collection
> at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869)
> at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869)
> at scala.Option.getOrElse(Option.scala:120)
> at org.apache.spark.rdd.RDD.reduce(RDD.scala:869)
> at org.apache.spark.sql.json.JsonRDD$.inferSchema(JsonRDD.scala:57)
> at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:232)
> at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:204)
> 
> val messages = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](…)
> messages.foreachRDD { rdd =>
>   val message:RDD[String] = rdd.map { y => y._2 }
>   sqlContext.jsonRDD(message).registerTempTable("tempTable")
>   sqlContext.sql("SELECT time,To FROM tempTable")
>     .saveToCassandra(cassandra_keyspace, cassandra_table, SomeColumns("key", "msg"))
> }
> 
> 2. how to get all column names from json messages? I have hundreds of columns in the json formatted message. 
> 
> Thanks for your help!
> 
> 
> 
> 
> Best regards,
> 
> Cui Lin